kevin-wu24 commented on code in PR #20859:
URL: https://github.com/apache/kafka/pull/20859#discussion_r2561429495


##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -501,6 +503,13 @@ public void initialize(
         logger.info("Reading KRaft snapshot and log as part of the 
initialization");
         partitionState.updateState();
         logger.info("Starting voters are {}", partitionState.lastVoterSet());
+        if (nodeId.isPresent()) {
+            // if the starting voters contain the node id of this node, mark 
it as already joined
+            // because it is already in the voter set.
+            // Check using ReplicaKey (id + directoryId) to handle KIP-853 
properly
+            ReplicaKey localReplicaKey = ReplicaKey.of(nodeId.getAsInt(), 
nodeDirectoryId);
+            hasAutoJoined = 
partitionState.lastVoterSet().isVoter(localReplicaKey);

Review Comment:
   ```suggestion
               hasAutoJoined = partitionState.lastVoterSet().isVoter(
                   ReplicaKey.of(nodeId.getAsInt(), nodeDirectoryId)
               );
   ```



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -2335,8 +2344,11 @@ private boolean handleAddVoterResponse(
         /* These error codes indicate the replica was successfully added or 
the leader is unable to
          * process the request. In either case, reset the update voter set 
timer to back off.
          */
-        if (error == Errors.NONE || error == Errors.REQUEST_TIMED_OUT ||
-            error == Errors.DUPLICATE_VOTER) {
+        if (error == Errors.NONE) {
+            
quorum.followerStateOrThrow().resetUpdateVoterSetPeriod(currentTimeMs);
+            hasAutoJoined = true;

Review Comment:
   Take a look at this code from `AddVoterHandler#handleApiVersionsResponse`:
   ```
   current.setLastOffset(leaderState.appendVotersRecord(newVoters, 
currentTimeMs));
           if (!current.ackWhenCommitted()) { ***we execute the below when 
auto-join is enabled***
               // complete the future to send response, but do not reset the 
state,
               // since the new voter set is not yet committed
               current.future().complete(RaftUtil.addVoterResponse(Errors.NONE, 
null));
           }
   ```
   When auto-join is enabled, we send the RPC response BEFORE the new voters 
record is committed. The motivation behind this is detailed in: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1186%3A+Update+AddRaftVoterRequest+RPC+to+support+auto-join,
 and is necessary for a fully available auto-join feature.
   
   What this means here is that our local node may not actually have 
auto-joined, but we set `hasAutoJoined = true`. For example, what if the remote 
leader fails over before the VotersRecord adding the local node is committed? 
   
   The "more correct" place to set `hasAutoJoined = true` is in `pollFollower` 
if we are in the `quorum.isVoter()` case AND `quorumConfig.autoJoin() == true`.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3352,8 +3364,13 @@ private boolean 
shouldSendAddOrRemoveVoterRequest(FollowerState state, long curr
          * and are configured to auto join should attempt to automatically 
join the voter
          * set for the configured topic partition.
          */
-        return partitionState.lastKraftVersion().isReconfigSupported() && 
canBecomeVoter &&
-            quorumConfig.autoJoin() && 
state.hasUpdateVoterSetPeriodExpired(currentTimeMs);
+        if (!partitionState.lastKraftVersion().isReconfigSupported() || 
!canBecomeVoter ||
+            !quorumConfig.autoJoin() || 
!state.hasUpdateVoterSetPeriodExpired(currentTimeMs)) {
+            return false;
+        }
+
+        // Only attempt auto-join if we haven't already auto-joined
+        return !hasAutoJoined;

Review Comment:
   ```suggestion
           return !hasAutoJoined && 
partitionState.lastKraftVersion().isReconfigSupported() && canBecomeVoter && 
quorumConfig.autoJoin() && state.hasUpdateVoterSetPeriodExpired(currentTimeMs);
   ```
   IMO this reads cleaner. It will also short circuit after a node successfully 
auto-joins and sets `hasAutoJoined = true`, whereas this code evaluates all 
boolean expressions always.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -501,6 +503,13 @@ public void initialize(
         logger.info("Reading KRaft snapshot and log as part of the 
initialization");
         partitionState.updateState();
         logger.info("Starting voters are {}", partitionState.lastVoterSet());
+        if (nodeId.isPresent()) {
+            // if the starting voters contain the node id of this node, mark 
it as already joined

Review Comment:
   > if the starting voters contain the node id of this node, mark it as 
already joined
   
   This comment is inaccurate. We are checking `lastVoterSet()`, which is the 
VotersRecord with the highest offset, not the "starting voters." Setting 
`hasAutoJoined` properly here is where the complications of this new semantic 
become apparent.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -501,6 +503,13 @@ public void initialize(
         logger.info("Reading KRaft snapshot and log as part of the 
initialization");
         partitionState.updateState();
         logger.info("Starting voters are {}", partitionState.lastVoterSet());
+        if (nodeId.isPresent()) {
+            // if the starting voters contain the node id of this node, mark 
it as already joined
+            // because it is already in the voter set.
+            // Check using ReplicaKey (id + directoryId) to handle KIP-853 
properly
+            ReplicaKey localReplicaKey = ReplicaKey.of(nodeId.getAsInt(), 
nodeDirectoryId);
+            hasAutoJoined = 
partitionState.lastVoterSet().isVoter(localReplicaKey);

Review Comment:
   I want to point out that just because the last voter set contains the local 
`ReplicaKey`, that does not mean it necessarily was a result of auto-join, but 
in-practice I think this is okay.



##########
raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java:
##########
@@ -105,7 +105,7 @@ public class QuorumConfig {
 
     public static final String QUORUM_AUTO_JOIN_ENABLE_CONFIG = QUORUM_PREFIX 
+ "auto.join.enable";
     public static final String QUORUM_AUTO_JOIN_ENABLE_DOC = "Controls whether 
a KRaft controller should automatically " +
-        "join the cluster metadata partition for its cluster id.";
+        "join the cluster metadata partition for its cluster id when the node 
startup.";

Review Comment:
   ```suggestion
           "join the cluster metadata partition for its cluster id during node 
startup. This property can only be `true` when the process.roles contains 
`controller`.";
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to