jsancio commented on code in PR #20551:
URL: https://github.com/apache/kafka/pull/20551#discussion_r2368926488


##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -342,18 +339,21 @@ object StorageTool extends Logging {
 
     val reconfigurableQuorumOptions = formatParser.addMutuallyExclusiveGroup()
     reconfigurableQuorumOptions.addArgument("--standalone", "-s")
-      .help("Used to initialize a controller as a single-node dynamic quorum.")
+      .help("Used to initialize a controller as a single-node dynamic quorum. 
When setting this flag, " +
+        "the controller.quorum.voters config must not be set, and 
controller.quorum.bootstrap.servers is set instead.")
       .action(storeTrue())
 
     reconfigurableQuorumOptions.addArgument("--no-initial-controllers", "-N")
-      .help("Used to initialize a server without a dynamic quorum topology.")
+      .help("Used to initialize a server without a dynamic quorum topology. 
When setting this flag, " +

Review Comment:
   Let's remove the word topology. It matches the phrase used for --standalone.



##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -342,18 +339,21 @@ object StorageTool extends Logging {
 
     val reconfigurableQuorumOptions = formatParser.addMutuallyExclusiveGroup()
     reconfigurableQuorumOptions.addArgument("--standalone", "-s")
-      .help("Used to initialize a controller as a single-node dynamic quorum.")
+      .help("Used to initialize a controller as a single-node dynamic quorum. 
When setting this flag, " +
+        "the controller.quorum.voters config must not be set, and 
controller.quorum.bootstrap.servers is set instead.")
       .action(storeTrue())
 
     reconfigurableQuorumOptions.addArgument("--no-initial-controllers", "-N")
-      .help("Used to initialize a server without a dynamic quorum topology.")
+      .help("Used to initialize a server without a dynamic quorum topology. 
When setting this flag, " +
+        "the controller.quorum.voters config should not be set, and 
controller.quorum.bootstrap.servers is set instead.")
       .action(storeTrue())
 
     reconfigurableQuorumOptions.addArgument("--initial-controllers", "-I")
       .help("Used to initialize a server with a specific dynamic quorum 
topology. The argument " +

Review Comment:
   Same here. Do you mind removing the word topology?



##########
metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java:
##########
@@ -217,8 +217,8 @@ public Formatter setInitialControllers(DynamicVoters 
initialControllers) {
         return this;
     }
 
-    public Formatter setNoInitialControllersFlag(boolean 
noInitialControllersFlag) {
-        this.noInitialControllersFlag = noInitialControllersFlag;
+    public Formatter setHasDynamicQuorum(boolean staticVotersEmpty) {
+        this.hasDynamicQuorum = staticVotersEmpty;

Review Comment:
   The parameter name should be something like `hasDynamicQuorum`.



##########
metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java:
##########
@@ -495,10 +499,26 @@ public void 
testFormatWithInitialVotersFailsWithOlderMetadataVersion() throws Ex
             formatter1.formatter.setInitialControllers(DynamicVoters.
                     parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
             formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
-            assertEquals("kraft.version could not be set to 1 because it 
depends on " +
-                "metadata.version level 21",
-                    assertThrows(IllegalArgumentException.class,
-                        formatter1.formatter::run).getMessage());
+            formatter1.formatter.setHasDynamicQuorum(true);
+            formatter1.formatter.run();
+            assertEquals((short) 1, 
formatter1.formatter.featureLevels.getOrDefault("kraft.version", (short) 0));
+        }
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void 
testFormatWithNoInitialControllersWithOlderMetadataVersion(boolean 
emptyStaticVoters) throws Exception {
+        try (TestEnv testEnv = new TestEnv(2)) {
+            FormatterContext formatter1 = testEnv.newFormatter();
+            
formatter1.formatter.setReleaseVersion(MetadataVersion.IBP_3_8_IV0);
+            // This MV does not support kraft.version = 1
+            formatter1.formatter.setHasDynamicQuorum(emptyStaticVoters);
+            formatter1.formatter.run();
+            if (emptyStaticVoters) {
+                assertEquals((short) 1, 
formatter1.formatter.featureLevels.getOrDefault("kraft.version", (short) 0));
+            } else {
+                assertEquals((short) 0, 
formatter1.formatter.featureLevels.getOrDefault("kraft.version", (short) 1));
+            }

Review Comment:
   Don't you want to differentiate between missing value and the default in the 
test? Why not use `featureLevels.get("kraft.version")`? This comment applies to 
a few places in this file.



##########
test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java:
##########
@@ -182,18 +181,31 @@ private KafkaConfig createNodeConfig(TestKitNode node) 
throws IOException {
             props.putIfAbsent(INTER_BROKER_LISTENER_NAME_CONFIG, 
brokerListenerName);
             props.putIfAbsent(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
controllerListenerName);
 
-            StringBuilder quorumVoterStringBuilder = new StringBuilder();
-            String prefix = "";
-            for (int nodeId : nodes.controllerNodes().keySet()) {
-                quorumVoterStringBuilder.append(prefix).
-                    append(nodeId).
-                    append("@").
-                    append("localhost").
-                    append(":").
-                    
append(socketFactoryManager.getOrCreatePortForListener(nodeId, 
controllerListenerName));
-                prefix = ",";
+            if (!standalone && initialVoterSet.isEmpty()) {
+                StringBuilder quorumVoterStringBuilder = new StringBuilder();
+                String prefix = "";
+                for (int nodeId : nodes.controllerNodes().keySet()) {
+                    quorumVoterStringBuilder.append(prefix).
+                        append(nodeId).
+                        append("@").
+                        append("localhost").
+                        append(":").
+                        
append(socketFactoryManager.getOrCreatePortForListener(nodeId, 
controllerListenerName));
+                    prefix = ",";
+                }
+                props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, 
quorumVoterStringBuilder.toString());
+            } else {
+                StringBuilder bootstrapServersStringBuilder = new 
StringBuilder();
+                String prefix = "";
+                for (int nodeId : nodes.controllerNodes().keySet()) {
+                    bootstrapServersStringBuilder.append(prefix).
+                        append("localhost").
+                        append(":").
+                        
append(socketFactoryManager.getOrCreatePortForListener(nodeId, 
controllerListenerName));
+                    prefix = ",";
+                }
+                props.put(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServersStringBuilder.toString());

Review Comment:
   Thanks for fixing this.



##########
metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java:
##########
@@ -348,20 +352,20 @@ private short effectiveKRaftFeatureLevel(Optional<Short> 
configuredKRaftVersionL
             if (configuredKRaftVersionLevel.get() == 0) {
                 if (hasDynamicQuorum()) {
                     throw new FormatterException(
-                        "Cannot set kraft.version to " +
-                        configuredKRaftVersionLevel.get() +
-                        " if one of the flags --standalone, 
--initial-controllers, or --no-initial-controllers is used. " +
+                        "Cannot set kraft.version to " + 
configuredKRaftVersionLevel.get() +
+                        " if controller.quorum.voters is empty and one of the 
flags --standalone, " +
+                        "--initial-controllers, or --no-initial-controllers is 
used. " +

Review Comment:
   This this true? Doesn't the tool allow --no-initial-controller when 
controller.quorum.voters is specified?



##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -163,16 +164,12 @@ object StorageTool extends Logging {
     if (isStandalone) {
       formatter.setInitialControllers(createStandaloneDynamicVoters(config))
     }
-    if (namespace.getBoolean("no_initial_controllers")) {
-      formatter.setNoInitialControllersFlag(true)
-    } else {
-      if (config.processRoles.contains(ProcessRole.ControllerRole)) {
-        if (config.quorumConfig.voters().isEmpty && 
formatter.initialVoters().isEmpty) {
+    if (!namespace.getBoolean("no_initial_controllers") &&
+      config.processRoles.contains(ProcessRole.ControllerRole) &&
+      staticVotersEmpty && formatter.initialVoters().isEmpty) {

Review Comment:
   I would add a newline after `&&` and before 
`formatter.initialVoters().isEmpt`.



##########
metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java:
##########
@@ -495,10 +499,26 @@ public void 
testFormatWithInitialVotersFailsWithOlderMetadataVersion() throws Ex
             formatter1.formatter.setInitialControllers(DynamicVoters.
                     parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
             formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
-            assertEquals("kraft.version could not be set to 1 because it 
depends on " +
-                "metadata.version level 21",
-                    assertThrows(IllegalArgumentException.class,
-                        formatter1.formatter::run).getMessage());
+            formatter1.formatter.setHasDynamicQuorum(true);
+            formatter1.formatter.run();
+            assertEquals((short) 1, 
formatter1.formatter.featureLevels.getOrDefault("kraft.version", (short) 0));
+        }
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void 
testFormatWithNoInitialControllersWithOlderMetadataVersion(boolean 
emptyStaticVoters) throws Exception {
+        try (TestEnv testEnv = new TestEnv(2)) {
+            FormatterContext formatter1 = testEnv.newFormatter();
+            
formatter1.formatter.setReleaseVersion(MetadataVersion.IBP_3_8_IV0);
+            // This MV does not support kraft.version = 1
+            formatter1.formatter.setHasDynamicQuorum(emptyStaticVoters);

Review Comment:
   This is a bit odd. Why not name the test parameter `hasDynamicQuorum`?



##########
test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java:
##########
@@ -477,53 +487,41 @@ private void formatNode(
             } else {
                 formatter.setMetadataLogDirectory(Optional.empty());
             }
-            if 
(nodes.bootstrapMetadata().featureLevel(KRaftVersion.FEATURE_NAME) > 0) {
-                StringBuilder dynamicVotersBuilder = new StringBuilder();
-                String prefix = "";
-                if (standalone) {
-                    if (nodeId == TestKitDefaults.CONTROLLER_ID_OFFSET) {
-                        final var controllerNode = 
nodes.controllerNodes().get(nodeId);
-                        dynamicVotersBuilder.append(
-                            String.format(
-                                "%d@localhost:%d:%s",
-                                controllerNode.id(),
-                                socketFactoryManager.
-                                    
getOrCreatePortForListener(controllerNode.id(), controllerListenerName),
-                                controllerNode.metadataDirectoryId()
-                            )
-                        );
-                        
formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString()));
-                    } else {
-                        formatter.setNoInitialControllersFlag(true);
-                    }
-                } else if (initialVoterSet.isPresent()) {
-                    for (final var controllerNode : 
initialVoterSet.get().entrySet()) {
-                        final var voterId = controllerNode.getKey();
-                        final var voterDirectoryId = controllerNode.getValue();
-                        dynamicVotersBuilder.append(prefix);
-                        prefix = ",";
-                        dynamicVotersBuilder.append(
-                            String.format(
-                                "%d@localhost:%d:%s",
-                                voterId,
-                                socketFactoryManager.
-                                    getOrCreatePortForListener(voterId, 
controllerListenerName),
-                                voterDirectoryId
-                            )
-                        );
-                    }
-                    
formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString()));
-                } else {
-                    for (TestKitNode controllerNode : 
nodes.controllerNodes().values()) {
-                        int port = socketFactoryManager.
-                            getOrCreatePortForListener(controllerNode.id(), 
controllerListenerName);
-                        dynamicVotersBuilder.append(prefix);
-                        prefix = ",";
-                        
dynamicVotersBuilder.append(String.format("%d@localhost:%d:%s",
-                            controllerNode.id(), port, 
controllerNode.metadataDirectoryId()));
-                    }
+            StringBuilder dynamicVotersBuilder = new StringBuilder();
+            String prefix = "";
+            if (standalone) {
+                if (nodeId == TestKitDefaults.CONTROLLER_ID_OFFSET) {
+                    final var controllerNode = 
nodes.controllerNodes().get(nodeId);
+                    dynamicVotersBuilder.append(
+                        String.format(
+                            "%d@localhost:%d:%s",
+                            controllerNode.id(),
+                            socketFactoryManager.
+                                
getOrCreatePortForListener(controllerNode.id(), controllerListenerName),
+                            controllerNode.metadataDirectoryId()
+                        )
+                    );
                     
formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString()));
                 }
+                formatter.setHasDynamicQuorum(true);

Review Comment:
   I don't under the added comment. This execute when standalone. Irrespective 
of the node id. Doesn't that mean --standalone and --no-initial-controllers?



##########
metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java:
##########
@@ -348,20 +352,20 @@ private short effectiveKRaftFeatureLevel(Optional<Short> 
configuredKRaftVersionL
             if (configuredKRaftVersionLevel.get() == 0) {
                 if (hasDynamicQuorum()) {
                     throw new FormatterException(
-                        "Cannot set kraft.version to " +
-                        configuredKRaftVersionLevel.get() +
-                        " if one of the flags --standalone, 
--initial-controllers, or --no-initial-controllers is used. " +
+                        "Cannot set kraft.version to " + 
configuredKRaftVersionLevel.get() +
+                        " if controller.quorum.voters is empty and one of the 
flags --standalone, " +
+                        "--initial-controllers, or --no-initial-controllers is 
used. " +
                         "For dynamic controllers support, try removing the 
--feature flag for kraft.version."
                     );
                 }
             } else {
                 if (!hasDynamicQuorum()) {
                     throw new FormatterException(
-                        "Cannot set kraft.version to " +
-                        configuredKRaftVersionLevel.get() +
-                        " unless one of the flags --standalone, 
--initial-controllers, or --no-initial-controllers is used. " +
-                        "For dynamic controllers support, try using one of 
--standalone, --initial-controllers, or " +
-                        "--no-initial-controllers."
+                        "Cannot set kraft.version to " + 
configuredKRaftVersionLevel.get() +
+                        " unless controller.quorum.voters is empty and one of 
the flags --standalone, " +
+                        "--initial-controllers, or --no-initial-controllers  
is used. " +

Review Comment:
   Extra space after --no-initial-controllers



##########
test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java:
##########
@@ -93,11 +93,6 @@ public Builder setBootstrapMetadata(BootstrapMetadata 
bootstrapMetadata) {
             return this;
         }
 
-        public Builder setFeature(String featureName, short level) {
-            this.bootstrapMetadata = 
bootstrapMetadata.copyWithFeatureRecord(featureName, level);
-            return this;
-        }

Review Comment:
   Thanks for finally removing this configuration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to