This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e3751a838c1 KAFKA-17794: Add some formatting safeguards for KIP-853 
(#17504)
e3751a838c1 is described below

commit e3751a838c1a58ba5fe2570b266a9ca1bca8fdae
Author: Colin Patrick McCabe <cmcc...@apache.org>
AuthorDate: Mon Oct 21 10:06:41 2024 -0700

    KAFKA-17794: Add some formatting safeguards for KIP-853 (#17504)
    
    KIP-853 adds support for dynamic KRaft quorums. This means that the quorum 
topology is
    no longer statically determined by the controller.quorum.voters 
configuration. Instead, it
    is contained in the storage directories of each controller and broker.
    
    Users of dynamic quorums must format at least one controller storage 
directory with either
    the --initial-controllers or --standalone flags.  If they fail to do this, 
no quorum can be
    established. This PR changes the storage tool to warn about the case where 
a KIP-853 flag has
    not been supplied to format a KIP-853 controller. (Note that broker storage 
directories
    can continue to be formatted without a KIP-853 flag.)
    
    There are cases where we don't want to specify initial voters when 
formatting a controller. One
    example is where we format a single controller with --standalone, and then 
dynamically add 4
    more controllers with no initial topology. In this case, we want the 4 
later controllers to grab
    the quorum topology from the initial one. To support this case, this PR 
adds the
    --no-initial-controllers flag.
    
    Reviewers: José Armando García Sancio <jsan...@apache.org>, Federico Valeri 
<fval...@redhat.com>
---
 core/src/main/scala/kafka/tools/StorageTool.scala  | 31 ++++++++---
 .../scala/unit/kafka/tools/StorageToolTest.scala   | 61 ++++++++++++++++++++--
 docs/ops.html                                      |  4 +-
 .../apache/kafka/metadata/storage/Formatter.java   |  6 ++-
 .../kafka/metadata/storage/FormatterTest.java      |  6 +--
 tests/kafkatest/services/kafka/kafka.py            |  9 ++--
 6 files changed, 96 insertions(+), 21 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala 
b/core/src/main/scala/kafka/tools/StorageTool.scala
index cbff294ef45..e7deff0e62d 100644
--- a/core/src/main/scala/kafka/tools/StorageTool.scala
+++ b/core/src/main/scala/kafka/tools/StorageTool.scala
@@ -30,7 +30,7 @@ import org.apache.kafka.common.utils.{Exit, Utils}
 import org.apache.kafka.server.common.{Features, MetadataVersion}
 import org.apache.kafka.metadata.properties.{MetaProperties, 
MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
 import org.apache.kafka.metadata.storage.{Formatter, FormatterException}
-import org.apache.kafka.raft.DynamicVoters
+import org.apache.kafka.raft.{DynamicVoters, QuorumConfig}
 import org.apache.kafka.server.ProcessRole
 import org.apache.kafka.server.config.ReplicationConfigs
 
@@ -135,9 +135,20 @@ object StorageTool extends Logging {
         foreach(v => 
formatter.setReleaseVersion(MetadataVersion.fromVersionString(v.toString)))
     }
     Option(namespace.getString("initial_controllers")).
-      foreach(v => formatter.setInitialVoters(DynamicVoters.parse(v)))
+      foreach(v => formatter.setInitialControllers(DynamicVoters.parse(v)))
     if (namespace.getBoolean("standalone")) {
-      formatter.setInitialVoters(createStandaloneDynamicVoters(config))
+      formatter.setInitialControllers(createStandaloneDynamicVoters(config))
+    }
+    if (!namespace.getBoolean("no_initial_controllers")) {
+      if (config.processRoles.contains(ProcessRole.ControllerRole)) {
+        if (config.quorumConfig.voters().isEmpty) {
+          if (formatter.initialVoters().isEmpty()) {
+            throw new TerseFailure("Because " + 
QuorumConfig.QUORUM_VOTERS_CONFIG +
+              " is not set on this controller, you must specify one of the 
following: " +
+              "--standalone, --initial-controllers, or 
--no-initial-controllers.");
+          }
+        }
+      }
     }
     Option(namespace.getList("add_scram")).
       foreach(scramArgs => 
formatter.setScramArguments(scramArgs.asInstanceOf[util.List[String]]))
@@ -238,7 +249,7 @@ object StorageTool extends Logging {
     config: KafkaConfig
   ): DynamicVoters = {
     if (!config.processRoles.contains(ProcessRole.ControllerRole)) {
-      throw new TerseFailure("You cannot use --standalone on a broker node.")
+      throw new TerseFailure("You can only use --standalone on a controller.")
     }
     if (config.effectiveAdvertisedControllerListeners.isEmpty) {
       throw new RuntimeException("No controller listeners found.")
@@ -306,12 +317,18 @@ object StorageTool extends Logging {
 
     val reconfigurableQuorumOptions = formatParser.addMutuallyExclusiveGroup()
     reconfigurableQuorumOptions.addArgument("--standalone", "-s")
-      .help("Used to initialize a single-node quorum controller quorum.")
+      .help("Used to initialize a controller as a single-node dynamic quorum.")
+      .action(storeTrue())
+
+    reconfigurableQuorumOptions.addArgument("--no-initial-controllers", "-N")
+      .help("Used to initialize a server without a dynamic quorum topology.")
       .action(storeTrue())
 
     reconfigurableQuorumOptions.addArgument("--initial-controllers", "-I")
-      .help("The initial controllers, as a comma-separated list of 
id@hostname:port:directory. The same values must be used to format all nodes. 
For example:\n" +
-        
"0...@example.com:8082:JEXY6aqzQY-32P5TStzaFg,1...@example.com:8083:MvDxzVmcRsaTz33bUuRU6A,2...@example.com:8084:07R5amHmR32VDA6jHkGbTA\n")
+      .help("Used to initialize a server with a specific dynamic quorum 
topology. The argument " +
+        "is a comma-separated list of id@hostname:port:directory. The same 
values must be used to " +
+        "format all nodes. For 
example:\n...@example.com:8082:JEXY6aqzQY-32P5TStzaFg,1...@example.com:8083:" +
+        
"MvDxzVmcRsaTz33bUuRU6A,2...@example.com:8084:07R5amHmR32VDA6jHkGbTA\n")
       .action(store())
   }
 
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala 
b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index 9070f2febd4..e93cb84d002 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -177,8 +177,9 @@ Found problem:
   defaultDynamicQuorumProperties.setProperty("process.roles", "controller")
   defaultDynamicQuorumProperties.setProperty("node.id", "0")
   defaultDynamicQuorumProperties.setProperty("controller.listener.names", 
"CONTROLLER")
-  defaultDynamicQuorumProperties.setProperty("controller.quorum.voters", 
"0@localhost:9093")
-  defaultDynamicQuorumProperties.setProperty("listeners", 
"CONTROLLER://127.0.0.1:9093")
+  
defaultDynamicQuorumProperties.setProperty("controller.quorum.bootstrap.servers",
 "localhost:9093")
+  defaultDynamicQuorumProperties.setProperty("listeners", "CONTROLLER://:9093")
+  defaultDynamicQuorumProperties.setProperty("advertised.listeners", 
"CONTROLLER://127.0.0.1:9093")
   
defaultDynamicQuorumProperties.setProperty(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG,
 "true")
   
defaultDynamicQuorumProperties.setProperty(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG
 , "true")
 
@@ -378,7 +379,7 @@ Found problem:
     properties.setProperty("log.dirs", availableDirs.mkString(","))
     val stream = new ByteArrayOutputStream()
     val arguments = ListBuffer[String]("--release-version", "3.9-IV0", 
"--standalone")
-    assertEquals("You cannot use --standalone on a broker node.",
+    assertEquals("You can only use --standalone on a controller.",
       assertThrows(classOf[TerseFailure],
         () => runFormatCommand(stream, properties, 
arguments.toSeq)).getMessage)
   }
@@ -437,6 +438,56 @@ Found problem:
       "Failed to find content in output: " + stream.toString())
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("controller", "broker,controller"))
+  def 
testFormatWithoutStaticQuorumFailsWithoutInitialControllersOnController(processRoles:
 String): Unit = {
+    val availableDirs = Seq(TestUtils.tempDir())
+    val properties = new Properties()
+    properties.putAll(defaultDynamicQuorumProperties)
+    if (processRoles.contains("broker")) {
+      properties.setProperty("listeners", 
"PLAINTEXT://:9092,CONTROLLER://:9093")
+      properties.setProperty("advertised.listeners", 
"PLAINTEXT://127.0.0.1:9092,CONTROLLER://127.0.0.1:9093")
+    }
+    properties.setProperty("process.roles", processRoles)
+    properties.setProperty("log.dirs", availableDirs.mkString(","))
+    assertEquals("Because controller.quorum.voters is not set on this 
controller, you must " +
+      "specify one of the following: --standalone, --initial-controllers, or " 
+
+        "--no-initial-controllers.",
+          assertThrows(classOf[TerseFailure],
+            () => runFormatCommand(new ByteArrayOutputStream(), properties,
+              Seq("--release-version", "3.9-IV0"))).getMessage)
+  }
+
+  @Test
+  def testFormatWithNoInitialControllersSucceedsOnController(): Unit = {
+    val availableDirs = Seq(TestUtils.tempDir())
+    val properties = new Properties()
+    properties.putAll(defaultDynamicQuorumProperties)
+    properties.setProperty("log.dirs", availableDirs.mkString(","))
+    val stream = new ByteArrayOutputStream()
+    assertEquals(0, runFormatCommand(stream, properties,
+      Seq("--no-initial-controllers", "--release-version", "3.9-IV0")))
+    assertTrue(stream.toString().
+      contains("Formatting metadata directory %s".format(availableDirs.head)),
+      "Failed to find content in output: " + stream.toString())
+  }
+
+  @Test
+  def 
testFormatWithoutStaticQuorumSucceedsWithoutInitialControllersOnBroker(): Unit 
= {
+    val availableDirs = Seq(TestUtils.tempDir())
+    val properties = new Properties()
+    properties.putAll(defaultDynamicQuorumProperties)
+    properties.setProperty("listeners", "PLAINTEXT://:9092")
+    properties.setProperty("advertised.listeners", 
"PLAINTEXT://127.0.0.1:9092")
+    properties.setProperty("process.roles", "broker")
+    properties.setProperty("log.dirs", availableDirs.mkString(","))
+    val stream = new ByteArrayOutputStream()
+    assertEquals(0, runFormatCommand(stream, properties, 
Seq("--release-version", "3.9-IV0")))
+    assertTrue(stream.toString().
+      contains("Formatting metadata directory %s".format(availableDirs.head)),
+      "Failed to find content in output: " + stream.toString())
+  }
+
   private def runVersionMappingCommand(
     stream: ByteArrayOutputStream,
     releaseVersion: String
@@ -620,7 +671,7 @@ Found problem:
   def testBootstrapScramRecords(): Unit = {
     val availableDirs = Seq(TestUtils.tempDir())
     val properties = new Properties()
-    properties.putAll(defaultDynamicQuorumProperties)
+    properties.putAll(defaultStaticQuorumProperties)
     properties.setProperty("log.dirs", availableDirs.mkString(","))
     val stream = new ByteArrayOutputStream()
     val arguments = ListBuffer[String](
@@ -647,7 +698,7 @@ Found problem:
   def testScramRecordsOldReleaseVersion(): Unit = {
     val availableDirs = Seq(TestUtils.tempDir())
     val properties = new Properties()
-    properties.putAll(defaultDynamicQuorumProperties)
+    properties.putAll(defaultStaticQuorumProperties)
     properties.setProperty("log.dirs", availableDirs.mkString(","))
     val stream = new ByteArrayOutputStream()
     val arguments = ListBuffer[String](
diff --git a/docs/ops.html b/docs/ops.html
index 5a5249aa56c..4301b64f1bd 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -3817,9 +3817,9 @@ This command is similar to the standalone version but the 
snapshot at 0000000000
 In the replica description 0@controller-0:1234:3Db5QLSqSZieL3rJBUUegA, 0 is 
the replica id, 3Db5QLSqSZieL3rJBUUegA is the replica directory id, 
controller-0 is the replica's host and 1234 is the replica's port.
 
   <h5 class="anchor-heading"><a id="kraft_storage_observers" 
class="anchor-link"></a><a href="#kraft_storage_observers">Formatting Brokers 
and New Controllers</a></h5>
-  When provisioning new broker and controller nodes that we want to add to an 
existing Kafka cluster, use the <code>kafka-storage.sh format</code> command 
without the --standalone or --initial-controllers flags.
+  When provisioning new broker and controller nodes that we want to add to an 
existing Kafka cluster, use the <code>kafka-storage.sh format</code> command 
with the --no-initial-controllers flag.
 
-  <pre><code class="language-bash">$ bin/kafka-storage.sh format --cluster-id 
&lt;cluster-id&gt; --config server.properties</code></pre>
+  <pre><code class="language-bash">$ bin/kafka-storage.sh format --cluster-id 
&lt;cluster-id&gt; --config server.properties 
--no-initial-controllers</code></pre>
 
   <h4 class="anchor-heading"><a id="kraft_reconfig" class="anchor-link"></a><a 
href="#kraft_reconfig">Controller membership changes</a></h4>
 
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java 
b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
index 72995fb753e..847285c7448 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
@@ -202,11 +202,15 @@ public class Formatter {
         return this;
     }
 
-    public Formatter setInitialVoters(DynamicVoters initialControllers) {
+    public Formatter setInitialControllers(DynamicVoters initialControllers) {
         this.initialControllers = Optional.of(initialControllers);
         return this;
     }
 
+    public Optional<DynamicVoters> initialVoters() {
+        return initialControllers;
+    }
+
     boolean hasDynamicQuorum() {
         if (initialControllers.isPresent()) {
             return true;
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java 
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
index 75665cd3fb5..45a896c47c4 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
@@ -376,7 +376,7 @@ public class FormatterTest {
                 formatter1.formatter.setFeatureLevel("kraft.version", (short) 
1);
             }
             formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
-            formatter1.formatter.setInitialVoters(DynamicVoters.
+            formatter1.formatter.setInitialControllers(DynamicVoters.
                 parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
             formatter1.formatter.run();
             assertEquals(Arrays.asList(
@@ -407,7 +407,7 @@ public class FormatterTest {
             FormatterContext formatter1 = testEnv.newFormatter();
             formatter1.formatter.setFeatureLevel("kraft.version", (short) 0);
             formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
-            formatter1.formatter.setInitialVoters(DynamicVoters.
+            formatter1.formatter.setInitialControllers(DynamicVoters.
                     parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
             assertTrue(formatter1.formatter.hasDynamicQuorum());
             assertEquals("Cannot set kraft.version to 0 if KIP-853 
configuration is present. " +
@@ -437,7 +437,7 @@ public class FormatterTest {
             FormatterContext formatter1 = testEnv.newFormatter();
             
formatter1.formatter.setReleaseVersion(MetadataVersion.IBP_3_8_IV0);
             formatter1.formatter.setFeatureLevel("kraft.version", (short) 1);
-            formatter1.formatter.setInitialVoters(DynamicVoters.
+            formatter1.formatter.setInitialControllers(DynamicVoters.
                     parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
             formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
             assertEquals("kraft.version could not be set to 1 because it 
depends on " +
diff --git a/tests/kafkatest/services/kafka/kafka.py 
b/tests/kafkatest/services/kafka/kafka.py
index f15ec2f3ff9..81005eeb323 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -869,9 +869,12 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
             cmd = "%s format --ignore-formatted --config %s --cluster-id %s" % 
(kafka_storage_script, KafkaService.CONFIG_FILE, config_property.CLUSTER_ID)
             if self.dynamicRaftQuorum:
                 cmd += " --feature kraft.version=1"
-                if not self.standalone_controller_bootstrapped and 
self.node_quorum_info.has_controller_role:
-                    cmd += " --standalone"
-                    self.standalone_controller_bootstrapped = True
+                if self.node_quorum_info.has_controller_role:
+                    if self.standalone_controller_bootstrapped:
+                        cmd += " --no-initial-controllers"
+                    else:
+                        cmd += " --standalone"
+                        self.standalone_controller_bootstrapped = True
             self.logger.info("Running log directory format command...\n%s" % 
cmd)
             node.account.ssh(cmd)
 

Reply via email to