kevin-wu24 commented on code in PR #20859:
URL: https://github.com/apache/kafka/pull/20859#discussion_r2561593244


##########
raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java:
##########
@@ -105,7 +105,7 @@ public class QuorumConfig {
 
     public static final String QUORUM_AUTO_JOIN_ENABLE_CONFIG = QUORUM_PREFIX 
+ "auto.join.enable";
     public static final String QUORUM_AUTO_JOIN_ENABLE_DOC = "Controls whether 
a KRaft controller should automatically " +
-        "join the cluster metadata partition for its cluster id.";
+        "join the cluster metadata partition for its cluster id when the node 
startup.";

Review Comment:
   "Controls whether a KRaft controller (<nodeId, directoryUUID> tuple) should 
automatically join the cluster metadata partition as a voter during node 
startup if the controller is not already a voter"
   
   There is probably a better way to word the above.
   
   This most accurate describes what we are changing the semantic too. I think 
I am okay with this definition of the semantic from a UX POV. I guess we do not 
really care if the `ReplicaKey` was added by auto-join or not, just that it was 
added as a voter.
   
   Our proposed semantic is essentially to make the auto-join code "at most 
once" for the lifetime of a given controller process execution, conditioned on 
the local ReplicaKey not being part of our voter set. However, this semantic 
can run into UX issues too.



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientAutoJoinTest.java:
##########
@@ -317,6 +321,131 @@ private void pollAndDeliverFetchToUpdateVoterSet(
         context.client.poll();
     }
 
+    @Test
+    public void testBootstrapVoterSetDoesNotSendAddVoterAfterRemove() throws 
Exception {
+        // Test that a bootstrap voter node, after being removed by 
RemoveVoter RPC,
+        // will NOT automatically rejoin (canAutoJoin remains false)
+        final var leader = replicaKey(randomReplicaId(), true);
+        final var follower = replicaKey(leader.id() + 1, true);
+        final var thisVoter = replicaKey(follower.id() + 1, true);
+
+        final int epoch = 1;
+        final var context = new RaftClientTestContext.Builder(
+            thisVoter.id(),
+            thisVoter.directoryId().get())  // Same directory ID -> node is 
voter
+            .withRaftProtocol(KIP_853_PROTOCOL)
+            .withStartingVoters(
+                VoterSetTest.voterSet(Stream.of(leader, follower, thisVoter)), 
KRaftVersion.KRAFT_VERSION_1
+            )
+            .withElectedLeader(epoch, leader.id())
+            .withAutoJoin(true)
+            .withCanBecomeVoter(true)
+            .build();
+
+        // Node should be a follower (voter) initially
+        assertTrue(context.client.quorum().isFollower());
+
+        // Complete initial fetch
+        context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
+
+        // Simulate this node being removed by RemoveVoter RPC (initiated by 
another node)
+        // Update voter set via fetch - this node is now removed from voter set
+        pollAndDeliverFetchToUpdateVoterSet(
+            context,
+            epoch,
+            VoterSetTest.voterSet(Stream.of(leader, follower))  // This node 
is no longer in voter set
+        );
+
+        // Node should now be an observer after being removed
+        assertTrue(context.client.quorum().isObserver());
+
+        // Advance time to expire update voter set timer
+        context.time.sleep(context.fetchTimeoutMs);
+
+        // Poll and verify that NO AddVoter request is sent
+        // With canAutoJoin=false (set during init because ID was in voter 
set),
+        // node should NOT automatically rejoin
+        context.time.sleep(1);
+        context.pollUntilRequest();
+
+        // Should not include AddVoter
+        assertEquals(0, 
context.channel.drainSentRequests(Optional.of(ApiKeys.ADD_RAFT_VOTER)).size());
+
+    }
+
+    @Test
+    public void testSuccessfulAddVoterSetsCanAutoJoinAgain() throws Exception {
+        // Test scenario:
+        // 1. Node initializes as observer
+        // 2. Node joins as follower via AddVoter
+        // 3. Node is removed via RemoveVoter and becomes observer again
+        // 4. Node should NOT send AddVoter request again
+        final var leader = replicaKey(randomReplicaId(), true);
+        final var follower = replicaKey(leader.id() + 1, true);
+        final var newVoter = replicaKey(follower.id() + 1, true);
+        final int epoch = 1;
+        final var context = new RaftClientTestContext.Builder(
+            newVoter.id(),
+            newVoter.directoryId().get())
+            .withRaftProtocol(KIP_853_PROTOCOL)
+            .withStartingVoters(
+                    VoterSetTest.voterSet(Stream.of(leader, follower)), 
KRaftVersion.KRAFT_VERSION_1
+            )
+            .withElectedLeader(epoch, leader.id())
+            .withAutoJoin(true)
+            .withCanBecomeVoter(true)
+            .build();
+
+        // 1. Node should be an observer initially
+        assertTrue(context.client.quorum().isObserver());
+
+        context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
+
+        // 2. Send AddVoter request to join cluster
+        final var addVoterRequest = pollAndSendAddVoter(context, newVoter);
+
+        // Successfully add voter - this should set canAutoJoin to false
+        context.deliverResponse(
+            addVoterRequest.correlationId(),
+            addVoterRequest.destination(),
+            RaftUtil.addVoterResponse(Errors.NONE, Errors.NONE.message())
+        );
+
+        // Poll to process the response
+        context.client.poll();
+
+        // Update voter set via fetch - node is now a voter
+        pollAndDeliverFetchToUpdateVoterSet(
+            context,
+            epoch,
+            VoterSetTest.voterSet(Stream.of(leader, follower, newVoter))
+        );
+
+        // Node should now be a follower (voter)
+        assertTrue(context.client.quorum().isFollower());
+
+        // 3. Simulate this node being removed by RemoveVoter RPC
+        pollAndDeliverFetchToUpdateVoterSet(
+            context,
+            epoch,
+            VoterSetTest.voterSet(Stream.of(leader, follower))  // newVoter is 
removed
+        );
+
+        // Node should now be an observer after being removed
+        assertTrue(context.client.quorum().isObserver());
+
+        // Advance time to expire update voter set timer
+        context.time.sleep(context.fetchTimeoutMs);
+
+        // 4. Poll and verify that NO AddVoter request is sent
+        // Because canAutoJoin was set to false after successful AddVoter

Review Comment:
   Please update the comments around `canAutoJoin` since we renamed the boolean.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -222,6 +222,8 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
     private volatile RemoveVoterHandler removeVoterHandler;
     private volatile UpdateVoterHandler updateVoterHandler;
 
+    private volatile boolean hasAutoJoined = false;

Review Comment:
   Let's rename this to `hasJoined`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to