This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 423330ebe7d KAFKA-19692 improve the docs of "clusterId" for 
AddRaftVoterOptions and RemoveRaftVoterOptions (#20555)
423330ebe7d is described below

commit 423330ebe7d76697f6cb108530fcd4cb8be1db48
Author: Logan Zhu <[email protected]>
AuthorDate: Tue Sep 30 21:17:41 2025 +0800

    KAFKA-19692 improve the docs of "clusterId" for AddRaftVoterOptions and 
RemoveRaftVoterOptions (#20555)
    
    Improves the documentation of the clusterId field in AddRaftVoterOptions
    and RemoveRaftVoterOptions.
    
    The changes include:
    1. Adding Javadoc to both addRaftVoter and removeRaftVoter methods to
    explain the behavior of the optional clusterId.
    2. Integration tests have been added to verify the correct behavior of
    add and remove voter operations with and without clusterId, including
    scenarios with inconsistent cluster ids.
    
    Reviewers: TengYao Chi <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../admin/ReconfigurableQuorumIntegrationTest.java | 132 +++++++++++++++++++++
 .../kafka/clients/admin/AddRaftVoterOptions.java   |   9 ++
 .../java/org/apache/kafka/clients/admin/Admin.java |  19 ++-
 .../clients/admin/RemoveRaftVoterOptions.java      |   9 ++
 4 files changed, 167 insertions(+), 2 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ReconfigurableQuorumIntegrationTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ReconfigurableQuorumIntegrationTest.java
new file mode 100644
index 00000000000..f02db36c061
--- /dev/null
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ReconfigurableQuorumIntegrationTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InconsistentClusterIdException;
+import org.apache.kafka.common.test.KafkaClusterTestKit;
+import org.apache.kafka.common.test.TestKitNodes;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Tag("integration")
+public class ReconfigurableQuorumIntegrationTest {
+
+    static Map<Integer, Uuid> descVoterDirs(Admin admin) throws 
ExecutionException, InterruptedException {
+        var quorumInfo = admin.describeMetadataQuorum().quorumInfo().get();
+        return 
quorumInfo.voters().stream().collect(Collectors.toMap(QuorumInfo.ReplicaState::replicaId,
 QuorumInfo.ReplicaState::replicaDirectoryId));
+    }
+
+    @Test
+    public void testRemoveAndAddVoterWithValidClusterId() throws Exception {
+        final var nodes = new TestKitNodes.Builder()
+                .setClusterId("test-cluster")
+                .setNumBrokerNodes(1)
+                .setNumControllerNodes(3)
+                .build();
+
+        final Map<Integer, Uuid> initialVoters = new HashMap<>();
+        for (final var controllerNode : nodes.controllerNodes().values()) {
+            initialVoters.put(
+                    controllerNode.id(),
+                    controllerNode.metadataDirectoryId()
+            );
+        }
+
+        try (var cluster = new 
KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
+            cluster.format();
+            cluster.startup();
+            try (Admin admin = Admin.create(cluster.clientProperties())) {
+                TestUtils.waitForCondition(() -> {
+                    Map<Integer, Uuid> voters = descVoterDirs(admin);
+                    assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
+                    return voters.values().stream().noneMatch(directory -> 
directory.equals(Uuid.ZERO_UUID));
+                }, "Initial quorum voters should be {3000, 3001, 3002} and all 
should have non-zero directory IDs");
+
+                Uuid dirId = 
cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
+                admin.removeRaftVoter(
+                        3000,
+                        dirId,
+                        new 
RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
+                ).all().get();
+                TestUtils.waitForCondition(() -> {
+                    Map<Integer, Uuid> voters = descVoterDirs(admin);
+                    assertEquals(Set.of(3001, 3002), voters.keySet());
+                    return voters.values().stream().noneMatch(directory -> 
directory.equals(Uuid.ZERO_UUID));
+                }, "After removing voter 3000, remaining voters should be 
{3001, 3002} with non-zero directory IDs");
+
+                admin.addRaftVoter(
+                        3000,
+                        dirId,
+                        Set.of(new RaftVoterEndpoint("CONTROLLER", 
"example.com", 8080)),
+                        new 
AddRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
+                ).all().get();
+            }
+        }
+    }
+
+    @Test
+    public void testRemoveAndAddVoterWithInconsistentClusterId() throws 
Exception {
+        final var nodes = new TestKitNodes.Builder()
+                .setClusterId("test-cluster")
+                .setNumBrokerNodes(1)
+                .setNumControllerNodes(3)
+                .build();
+
+        final Map<Integer, Uuid> initialVoters = new HashMap<>();
+        for (final var controllerNode : nodes.controllerNodes().values()) {
+            initialVoters.put(
+                    controllerNode.id(),
+                    controllerNode.metadataDirectoryId()
+            );
+        }
+
+        try (var cluster = new 
KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
+            cluster.format();
+            cluster.startup();
+            try (Admin admin = Admin.create(cluster.clientProperties())) {
+                Uuid dirId = 
cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
+                var removeFuture = admin.removeRaftVoter(
+                        3000,
+                        dirId,
+                        new 
RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
+                ).all();
+                
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, 
removeFuture);
+
+                var addFuture = admin.addRaftVoter(
+                        3000,
+                        dirId,
+                        Set.of(new RaftVoterEndpoint("CONTROLLER", 
"example.com", 8080)),
+                        new 
AddRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
+                ).all();
+                
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture);
+            }
+        }
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/AddRaftVoterOptions.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/AddRaftVoterOptions.java
index e28a03d541c..81e889db30d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/AddRaftVoterOptions.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/AddRaftVoterOptions.java
@@ -17,11 +17,20 @@
 package org.apache.kafka.clients.admin;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.protocol.Errors;
 
 import java.util.Optional;
 
 /**
  * Options for {@link Admin#addRaftVoter}.
+ *
+ * <p>
+ * The clusterId is optional.
+ * <p>
+ * If provided, the request will only succeed if the cluster id matches the id 
of the current cluster.
+ * If the cluster id does not match, the request will fail with {@link 
Errors#INCONSISTENT_CLUSTER_ID}.
+ * <p>
+ * If not provided, the cluster id check is skipped.
  */
 @InterfaceStability.Stable
 public class AddRaftVoterOptions extends AbstractOptions<AddRaftVoterOptions> {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index e596754f62f..06ede9f620d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.FeatureUpdateFailedException;
+import org.apache.kafka.common.errors.InconsistentClusterIdException;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.quota.ClientQuotaAlteration;
@@ -1866,10 +1867,17 @@ public interface Admin extends AutoCloseable {
     /**
      * Add a new voter node to the KRaft metadata quorum.
      *
+     * <p>
+     * The clusterId in {@link AddRaftVoterOptions} is optional.
+     * If provided, the operation will only succeed if the cluster id matches 
the id
+     * of the current cluster. If the cluster id does not match, the operation
+     * will fail with {@link InconsistentClusterIdException}.
+     * If not provided, the cluster id check is skipped.
+     *
      * @param voterId           The node ID of the voter.
      * @param voterDirectoryId  The directory ID of the voter.
      * @param endpoints         The endpoints that the new voter has.
-     * @param options           The options to use when adding the new voter 
node.
+     * @param options           Additional options for the operation, 
including optional cluster ID.
      */
     AddRaftVoterResult addRaftVoter(
         int voterId,
@@ -1894,9 +1902,16 @@ public interface Admin extends AutoCloseable {
     /**
      * Remove a voter node from the KRaft metadata quorum.
      *
+     * <p>
+     * The clusterId in {@link AddRaftVoterOptions} is optional.
+     * If provided, the operation will only succeed if the cluster id matches 
the id
+     * of the current cluster. If the cluster id does not match, the operation
+     * will fail with {@link InconsistentClusterIdException}.
+     * If not provided, the cluster id check is skipped.
+     *
      * @param voterId           The node ID of the voter.
      * @param voterDirectoryId  The directory ID of the voter.
-     * @param options           The options to use when removing the voter 
node.
+     * @param options           Additional options for the operation, 
including optional cluster ID.
      */
     RemoveRaftVoterResult removeRaftVoter(
         int voterId,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/RemoveRaftVoterOptions.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/RemoveRaftVoterOptions.java
index cb5fe563c19..da6e965ebe0 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/RemoveRaftVoterOptions.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/RemoveRaftVoterOptions.java
@@ -17,11 +17,20 @@
 package org.apache.kafka.clients.admin;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.protocol.Errors;
 
 import java.util.Optional;
 
 /**
  * Options for {@link Admin#removeRaftVoter}.
+ *
+ * <p>
+ * The clusterId is optional.
+ * <p>
+ * If provided, the request will only succeed if the cluster id matches the id 
of the current cluster.
+ * If the cluster id does not match, the request will fail with {@link 
Errors#INCONSISTENT_CLUSTER_ID}.
+ * <p>
+ * If not provided, the cluster id check is skipped.
  */
 @InterfaceStability.Stable
 public class RemoveRaftVoterOptions extends 
AbstractOptions<RemoveRaftVoterOptions> {

Reply via email to