kevin-wu24 commented on code in PR #20859:
URL: https://github.com/apache/kafka/pull/20859#discussion_r2560073825


##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3356,30 +3366,56 @@ private boolean 
shouldSendAddOrRemoveVoterRequest(FollowerState state, long curr
             quorumConfig.autoJoin() && 
state.hasUpdateVoterSetPeriodExpired(currentTimeMs);
     }
 
+    private boolean shouldSendAddVoterRequest(FollowerState state, long 
currentTimeMs) {
+        return canAutoJoin && shouldAutoJoin(state, currentTimeMs);
+    }
+
+    private boolean shouldSendRemoveVoterRequest(FollowerState state, long 
currentTimeMs) {
+        final var localReplicaKey = quorum.localReplicaKeyOrThrow();
+        final var voters = partitionState.lastVoterSet();
+

Review Comment:
   In my opinion, these are too many boolean states and they are very difficult 
to reason about. If we add an additional `&& !hasAutoJoined` to the 
`shouldSendAddOrRemoveVoterRequest` predicate and ensure `hasAutoJoined` is set 
properly, we can keep all of the existing auto-join code the same.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3347,7 +3357,7 @@ private long pollFollowerAsVoter(FollowerState state, 
long currentTimeMs) {
         );
     }
 
-    private boolean shouldSendAddOrRemoveVoterRequest(FollowerState state, 
long currentTimeMs) {
+    private boolean shouldAutoJoin(FollowerState state, long currentTimeMs) {

Review Comment:
   Let's keep this method name as is.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -501,6 +503,11 @@ public void initialize(
         logger.info("Reading KRaft snapshot and log as part of the 
initialization");
         partitionState.updateState();
         logger.info("Starting voters are {}", partitionState.lastVoterSet());
+        if (nodeId.isPresent()) {
+            // if the starting voters contain the node id of this node, it 
can't auto join to the cluster
+            // because it is already in.
+            canAutoJoin = 
!partitionState.lastVoterSet().voterIds().contains(nodeId.getAsInt());
+        }

Review Comment:
   I think I understand what this is trying to do. Basically, if the local node 
is already part of the voter set we should avoid sending `AddVoterRPC`.  
However, there is an issue with this code:
   
   With KIP-853 enabled, a voter is no longer just a single Integer ID, but 
rather a tuple of `<Integer, UUID>` represented by the `ReplicaKey` class, so 
checking the ID is not sufficient. Look at `QuorumState#isVoter()` for the 
right way to do this check.



##########
docs/upgrade.html:
##########
@@ -171,8 +171,8 @@ <h5><a id="upgrade_420_notable" 
href="#upgrade_420_notable">Notable changes in 4
         </ul>
     </li>
     <li>
-        The <code>controller.quorum.auto.join.enable</code> has been added to 
<code>QuorumConfig</code>, enabling KRaft controllers to automatically join the 
cluster's voter set,
-        and defaults to false.
+        The <code>controller.quorum.auto.join.enable</code> has been added to 
<code>QuorumConfig</code>, enabling KRaft controllers to automatically join the 
cluster's voter set
+        when the node startup, and defaults to false.

Review Comment:
   We should also add the documentation from the KIP here to make this more 
clear: 
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217391519#KIP853:KRaftControllerMembershipChanges-controller.quorum.auto.join.enable



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -222,6 +222,8 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
     private volatile RemoveVoterHandler removeVoterHandler;
     private volatile UpdateVoterHandler updateVoterHandler;
 
+    private volatile boolean canAutoJoin = true;

Review Comment:
   It seems to me that the intention of the change is the following semantic: 
   
   For each lifetime of the controller process on a KRaft node, the node should 
be in the "auto-joining" state at most once (i.e. Observer state && 
`shouldSendAddOrRemoveVoterRequest(state, currentTimeMs)`). This means when we 
remove a node and it goes back to Observer state, the removed node will not 
automatically join back via `AddVoterRPC`. However, if the user removes the 
node as a voter and then the node restarts, the node will still try to 
auto-join the voter set.
   
   I think a better name for this boolean is `hasAutoJoined` with a value on 
startup of `false`. In `initialize()`, each controller node checks their 
`partitionState.lastVoterSet()` and sets `hasAutoJoined = true` if they are a 
part of the voter set already. Whenever an observer that has auto-join enabled 
successfully joins the voter set, we can set `hasAutoJoined = true`.
   
   The main problem with this functionality is that we don't know if previous 
additions to the voter set were a result of auto-join or manually adding the 
voter. I'm not sure how big of an issue that is, but there are a lot of edge 
cases around this feature if we have a semantic like the above. That is one 
reason why we wanted to keep the pre-requisite for executing the auto-join code 
simple.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to