This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 423330ebe7d KAFKA-19692 improve the docs of "clusterId" for
AddRaftVoterOptions and RemoveRaftVoterOptions (#20555)
423330ebe7d is described below
commit 423330ebe7d76697f6cb108530fcd4cb8be1db48
Author: Logan Zhu <[email protected]>
AuthorDate: Tue Sep 30 21:17:41 2025 +0800
KAFKA-19692 improve the docs of "clusterId" for AddRaftVoterOptions and
RemoveRaftVoterOptions (#20555)
Improves the documentation of the clusterId field in AddRaftVoterOptions
and RemoveRaftVoterOptions.
The changes include:
1. Adding Javadoc to both addRaftVoter and removeRaftVoter methods to
explain the behavior of the optional clusterId.
2. Integration tests have been added to verify the correct behavior of
add and remove voter operations with and without clusterId, including
scenarios with inconsistent cluster ids.
Reviewers: TengYao Chi <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../admin/ReconfigurableQuorumIntegrationTest.java | 132 +++++++++++++++++++++
.../kafka/clients/admin/AddRaftVoterOptions.java | 9 ++
.../java/org/apache/kafka/clients/admin/Admin.java | 19 ++-
.../clients/admin/RemoveRaftVoterOptions.java | 9 ++
4 files changed, 167 insertions(+), 2 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ReconfigurableQuorumIntegrationTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ReconfigurableQuorumIntegrationTest.java
new file mode 100644
index 00000000000..f02db36c061
--- /dev/null
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ReconfigurableQuorumIntegrationTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InconsistentClusterIdException;
+import org.apache.kafka.common.test.KafkaClusterTestKit;
+import org.apache.kafka.common.test.TestKitNodes;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Tag("integration")
+public class ReconfigurableQuorumIntegrationTest {
+
+ static Map<Integer, Uuid> descVoterDirs(Admin admin) throws
ExecutionException, InterruptedException {
+ var quorumInfo = admin.describeMetadataQuorum().quorumInfo().get();
+ return
quorumInfo.voters().stream().collect(Collectors.toMap(QuorumInfo.ReplicaState::replicaId,
QuorumInfo.ReplicaState::replicaDirectoryId));
+ }
+
+ @Test
+ public void testRemoveAndAddVoterWithValidClusterId() throws Exception {
+ final var nodes = new TestKitNodes.Builder()
+ .setClusterId("test-cluster")
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(3)
+ .build();
+
+ final Map<Integer, Uuid> initialVoters = new HashMap<>();
+ for (final var controllerNode : nodes.controllerNodes().values()) {
+ initialVoters.put(
+ controllerNode.id(),
+ controllerNode.metadataDirectoryId()
+ );
+ }
+
+ try (var cluster = new
KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
+ cluster.format();
+ cluster.startup();
+ try (Admin admin = Admin.create(cluster.clientProperties())) {
+ TestUtils.waitForCondition(() -> {
+ Map<Integer, Uuid> voters = descVoterDirs(admin);
+ assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
+ return voters.values().stream().noneMatch(directory ->
directory.equals(Uuid.ZERO_UUID));
+ }, "Initial quorum voters should be {3000, 3001, 3002} and all
should have non-zero directory IDs");
+
+ Uuid dirId =
cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
+ admin.removeRaftVoter(
+ 3000,
+ dirId,
+ new
RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
+ ).all().get();
+ TestUtils.waitForCondition(() -> {
+ Map<Integer, Uuid> voters = descVoterDirs(admin);
+ assertEquals(Set.of(3001, 3002), voters.keySet());
+ return voters.values().stream().noneMatch(directory ->
directory.equals(Uuid.ZERO_UUID));
+ }, "After removing voter 3000, remaining voters should be
{3001, 3002} with non-zero directory IDs");
+
+ admin.addRaftVoter(
+ 3000,
+ dirId,
+ Set.of(new RaftVoterEndpoint("CONTROLLER",
"example.com", 8080)),
+ new
AddRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
+ ).all().get();
+ }
+ }
+ }
+
+ @Test
+ public void testRemoveAndAddVoterWithInconsistentClusterId() throws
Exception {
+ final var nodes = new TestKitNodes.Builder()
+ .setClusterId("test-cluster")
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(3)
+ .build();
+
+ final Map<Integer, Uuid> initialVoters = new HashMap<>();
+ for (final var controllerNode : nodes.controllerNodes().values()) {
+ initialVoters.put(
+ controllerNode.id(),
+ controllerNode.metadataDirectoryId()
+ );
+ }
+
+ try (var cluster = new
KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
+ cluster.format();
+ cluster.startup();
+ try (Admin admin = Admin.create(cluster.clientProperties())) {
+ Uuid dirId =
cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
+ var removeFuture = admin.removeRaftVoter(
+ 3000,
+ dirId,
+ new
RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
+ ).all();
+
TestUtils.assertFutureThrows(InconsistentClusterIdException.class,
removeFuture);
+
+ var addFuture = admin.addRaftVoter(
+ 3000,
+ dirId,
+ Set.of(new RaftVoterEndpoint("CONTROLLER",
"example.com", 8080)),
+ new
AddRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
+ ).all();
+
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture);
+ }
+ }
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/AddRaftVoterOptions.java
b/clients/src/main/java/org/apache/kafka/clients/admin/AddRaftVoterOptions.java
index e28a03d541c..81e889db30d 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/AddRaftVoterOptions.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/AddRaftVoterOptions.java
@@ -17,11 +17,20 @@
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.protocol.Errors;
import java.util.Optional;
/**
* Options for {@link Admin#addRaftVoter}.
+ *
+ * <p>
+ * The clusterId is optional.
+ * <p>
+ * If provided, the request will only succeed if the cluster id matches the id
of the current cluster.
+ * If the cluster id does not match, the request will fail with {@link
Errors#INCONSISTENT_CLUSTER_ID}.
+ * <p>
+ * If not provided, the cluster id check is skipped.
*/
@InterfaceStability.Stable
public class AddRaftVoterOptions extends AbstractOptions<AddRaftVoterOptions> {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index e596754f62f..06ede9f620d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.FeatureUpdateFailedException;
+import org.apache.kafka.common.errors.InconsistentClusterIdException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
@@ -1866,10 +1867,17 @@ public interface Admin extends AutoCloseable {
/**
* Add a new voter node to the KRaft metadata quorum.
*
+ * <p>
+ * The clusterId in {@link AddRaftVoterOptions} is optional.
+ * If provided, the operation will only succeed if the cluster id matches
the id
+ * of the current cluster. If the cluster id does not match, the operation
+ * will fail with {@link InconsistentClusterIdException}.
+ * If not provided, the cluster id check is skipped.
+ *
* @param voterId The node ID of the voter.
* @param voterDirectoryId The directory ID of the voter.
* @param endpoints The endpoints that the new voter has.
- * @param options The options to use when adding the new voter
node.
+ * @param options Additional options for the operation,
including optional cluster ID.
*/
AddRaftVoterResult addRaftVoter(
int voterId,
@@ -1894,9 +1902,16 @@ public interface Admin extends AutoCloseable {
/**
* Remove a voter node from the KRaft metadata quorum.
*
+ * <p>
+ * The clusterId in {@link AddRaftVoterOptions} is optional.
+ * If provided, the operation will only succeed if the cluster id matches
the id
+ * of the current cluster. If the cluster id does not match, the operation
+ * will fail with {@link InconsistentClusterIdException}.
+ * If not provided, the cluster id check is skipped.
+ *
* @param voterId The node ID of the voter.
* @param voterDirectoryId The directory ID of the voter.
- * @param options The options to use when removing the voter
node.
+ * @param options Additional options for the operation,
including optional cluster ID.
*/
RemoveRaftVoterResult removeRaftVoter(
int voterId,
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/RemoveRaftVoterOptions.java
b/clients/src/main/java/org/apache/kafka/clients/admin/RemoveRaftVoterOptions.java
index cb5fe563c19..da6e965ebe0 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/RemoveRaftVoterOptions.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/RemoveRaftVoterOptions.java
@@ -17,11 +17,20 @@
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.protocol.Errors;
import java.util.Optional;
/**
* Options for {@link Admin#removeRaftVoter}.
+ *
+ * <p>
+ * The clusterId is optional.
+ * <p>
+ * If provided, the request will only succeed if the cluster id matches the id
of the current cluster.
+ * If the cluster id does not match, the request will fail with {@link
Errors#INCONSISTENT_CLUSTER_ID}.
+ * <p>
+ * If not provided, the cluster id check is skipped.
*/
@InterfaceStability.Stable
public class RemoveRaftVoterOptions extends
AbstractOptions<RemoveRaftVoterOptions> {