This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 857b1e92cc5 KAFKA-19719: --no-initial-controllers should not assume
kraft.version=1 (#20551)
857b1e92cc5 is described below
commit 857b1e92cc5c75eb178e48613e5963755bc1b03b
Author: Kevin Wu <[email protected]>
AuthorDate: Thu Sep 25 11:56:16 2025 -0500
KAFKA-19719: --no-initial-controllers should not assume kraft.version=1
(#20551)
Just because a controller node sets --no-initial-controllers flag does
not mean it is necessarily running kraft.version=1. The more precise
meaning is that the controller node being formatted does not know what
kraft version the cluster should be in, and therefore it is only safe to
assume kraft.version=0. Only by setting
--standalone,--initial-controllers, or --no-initial-controllers
AND not specifying the controller.quorum.voters static config, is it
known kraft.version > 0.
For example, it is a valid configuration (although confusing) to run a
static quorum defined by controller.quorum.voters but have all the
controllers format with --no-initial-controllers. In this case,
specifying --no-initial-controllers alongside a metadata version that
does not support kraft.version=1 causes formatting to fail, which is
a regression.
Additionally, the formatter should not check the kraft.version against
the release version, since kraft.version does not actually depend on any
release version. It should only check the kraft.version against the
static voters config/format arguments.
This PR also cleans up the integration test framework to match the
semantics of formatting an actual cluster.
Reviewers: TengYao Chi <[email protected]>, Kuan-Po Tseng
<[email protected]>, Chia-Ping Tsai <[email protected]>, José
Armando García Sancio <[email protected]>
---
core/src/main/scala/kafka/tools/StorageTool.scala | 27 ++---
.../ReconfigurableQuorumIntegrationTest.java | 51 ++++++---
.../kafka/server/KRaftClusterTest.scala | 3 +-
.../scala/unit/kafka/tools/StorageToolTest.scala | 9 +-
docs/ops.html | 10 +-
.../apache/kafka/metadata/storage/Formatter.java | 29 +++--
.../kafka/metadata/storage/FormatterTest.java | 103 +++++++----------
.../apache/kafka/server/common/KRaftVersion.java | 7 +-
.../kafka/common/test/KafkaClusterTestKit.java | 122 ++++++++++-----------
.../org/apache/kafka/common/test/TestKitNodes.java | 5 -
10 files changed, 171 insertions(+), 195 deletions(-)
diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala
b/core/src/main/scala/kafka/tools/StorageTool.scala
index c342ddfe071..d8048d4d0aa 100644
--- a/core/src/main/scala/kafka/tools/StorageTool.scala
+++ b/core/src/main/scala/kafka/tools/StorageTool.scala
@@ -144,8 +144,9 @@ object StorageTool extends Logging {
})
val initialControllers = namespace.getString("initial_controllers")
val isStandalone = namespace.getBoolean("standalone")
- if (!config.quorumConfig.voters().isEmpty &&
- (Option(initialControllers).isDefined || isStandalone)) {
+ val staticVotersEmpty = config.quorumConfig.voters().isEmpty
+ formatter.setHasDynamicQuorum(staticVotersEmpty)
+ if (!staticVotersEmpty && (Option(initialControllers).isDefined ||
isStandalone)) {
throw new TerseFailure("You cannot specify " +
QuorumConfig.QUORUM_VOTERS_CONFIG + " and format the node " +
"with --initial-controllers or --standalone. " +
@@ -158,16 +159,13 @@ object StorageTool extends Logging {
if (isStandalone) {
formatter.setInitialControllers(createStandaloneDynamicVoters(config))
}
- if (namespace.getBoolean("no_initial_controllers")) {
- formatter.setNoInitialControllersFlag(true)
- } else {
- if (config.processRoles.contains(ProcessRole.ControllerRole)) {
- if (config.quorumConfig.voters().isEmpty &&
formatter.initialVoters().isEmpty) {
+ if (!namespace.getBoolean("no_initial_controllers") &&
+ config.processRoles.contains(ProcessRole.ControllerRole) &&
+ staticVotersEmpty &&
+ formatter.initialVoters().isEmpty) {
throw new TerseFailure("Because " +
QuorumConfig.QUORUM_VOTERS_CONFIG +
" is not set on this controller, you must specify one of the
following: " +
"--standalone, --initial-controllers, or
--no-initial-controllers.");
- }
- }
}
Option(namespace.getList("add_scram")).
foreach(scramArgs =>
formatter.setScramArguments(scramArgs.asInstanceOf[util.List[String]]))
@@ -336,18 +334,21 @@ object StorageTool extends Logging {
val reconfigurableQuorumOptions = formatParser.addMutuallyExclusiveGroup()
reconfigurableQuorumOptions.addArgument("--standalone", "-s")
- .help("Used to initialize a controller as a single-node dynamic quorum.")
+ .help("Used to initialize a controller as a single-node dynamic quorum.
When setting this flag, " +
+ "the controller.quorum.voters config must not be set, and
controller.quorum.bootstrap.servers is set instead.")
.action(storeTrue())
reconfigurableQuorumOptions.addArgument("--no-initial-controllers", "-N")
- .help("Used to initialize a server without a dynamic quorum topology.")
+ .help("Used to initialize a server without specifying a dynamic quorum.
When setting this flag, " +
+ "the controller.quorum.voters config should not be set, and
controller.quorum.bootstrap.servers is set instead.")
.action(storeTrue())
reconfigurableQuorumOptions.addArgument("--initial-controllers", "-I")
- .help("Used to initialize a server with a specific dynamic quorum
topology. The argument " +
+ .help("Used to initialize a server with the specified dynamic quorum.
The argument " +
"is a comma-separated list of id@hostname:port:directory. The same
values must be used to " +
"format all nodes. For
example:\[email protected]:8082:JEXY6aqzQY-32P5TStzaFg,[email protected]:8083:" +
-
"MvDxzVmcRsaTz33bUuRU6A,[email protected]:8084:07R5amHmR32VDA6jHkGbTA\n")
+
"MvDxzVmcRsaTz33bUuRU6A,[email protected]:8084:07R5amHmR32VDA6jHkGbTA\n. When
setting this flag, " +
+ "the controller.quorum.voters config must not be set, and
controller.quorum.bootstrap.servers is set instead.")
.action(store())
}
diff --git
a/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java
b/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java
index ad4193a0cb9..c67e941dd7a 100644
--- a/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java
+++ b/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java
@@ -84,9 +84,8 @@ public class ReconfigurableQuorumIntegrationTest {
new TestKitNodes.Builder().
setNumBrokerNodes(1).
setNumControllerNodes(1).
- setFeature(KRaftVersion.FEATURE_NAME,
KRaftVersion.KRAFT_VERSION_1.featureLevel()).
build()
- ).build()) {
+ ).setStandalone(true).build()) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
@@ -108,13 +107,23 @@ public class ReconfigurableQuorumIntegrationTest {
@Test
public void testRemoveController() throws Exception {
- try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
- new TestKitNodes.Builder().
- setNumBrokerNodes(1).
- setNumControllerNodes(3).
- setFeature(KRaftVersion.FEATURE_NAME,
KRaftVersion.KRAFT_VERSION_1.featureLevel()).
- build()
- ).build()) {
+ final var nodes = new TestKitNodes.Builder().
+ setNumBrokerNodes(1).
+ setNumControllerNodes(3).
+ build();
+
+ final Map<Integer, Uuid> initialVoters = new HashMap<>();
+ for (final var controllerNode : nodes.controllerNodes().values()) {
+ initialVoters.put(
+ controllerNode.id(),
+ controllerNode.metadataDirectoryId()
+ );
+ }
+
+ try (KafkaClusterTestKit cluster = new
KafkaClusterTestKit.Builder(nodes).
+ setInitialVoterSet(initialVoters).
+ build()
+ ) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
@@ -133,12 +142,22 @@ public class ReconfigurableQuorumIntegrationTest {
@Test
public void testRemoveAndAddSameController() throws Exception {
- try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
- new TestKitNodes.Builder().
- setNumBrokerNodes(1).
- setNumControllerNodes(4).
- setFeature(KRaftVersion.FEATURE_NAME,
KRaftVersion.KRAFT_VERSION_1.featureLevel()).
- build()).build()
+ final var nodes = new TestKitNodes.Builder().
+ setNumBrokerNodes(1).
+ setNumControllerNodes(4).
+ build();
+
+ final Map<Integer, Uuid> initialVoters = new HashMap<>();
+ for (final var controllerNode : nodes.controllerNodes().values()) {
+ initialVoters.put(
+ controllerNode.id(),
+ controllerNode.metadataDirectoryId()
+ );
+ }
+
+ try (KafkaClusterTestKit cluster = new
KafkaClusterTestKit.Builder(nodes).
+ setInitialVoterSet(initialVoters).
+ build()
) {
cluster.format();
cluster.startup();
@@ -173,7 +192,6 @@ public class ReconfigurableQuorumIntegrationTest {
final var nodes = new TestKitNodes.Builder().
setNumBrokerNodes(1).
setNumControllerNodes(3).
- setFeature(KRaftVersion.FEATURE_NAME,
KRaftVersion.KRAFT_VERSION_1.featureLevel()).
build();
try (KafkaClusterTestKit cluster = new
KafkaClusterTestKit.Builder(nodes).
setConfigProp(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG, true).
@@ -199,7 +217,6 @@ public class ReconfigurableQuorumIntegrationTest {
final var nodes = new TestKitNodes.Builder().
setNumBrokerNodes(1).
setNumControllerNodes(3).
- setFeature(KRaftVersion.FEATURE_NAME,
KRaftVersion.KRAFT_VERSION_1.featureLevel()).
build();
// Configure the initial voters with one voter having a different
directory ID.
diff --git
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 27b06daed21..6f552a8ebe9 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -1035,8 +1035,7 @@ class KRaftClusterTest {
val cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setNumBrokerNodes(1).
- setNumControllerNodes(1).
- setFeature(KRaftVersion.FEATURE_NAME, 1.toShort).build()).build()
+ setNumControllerNodes(1).build()).setStandalone(true).build()
try {
cluster.format()
cluster.startup()
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index 1e938ea9cd8..a36ad51572a 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -605,19 +605,14 @@ Found problem:
Seq("--release-version", "3.9-IV0"))).getMessage)
}
- @ParameterizedTest
- @ValueSource(booleans = Array(false, true))
- def
testFormatWithNoInitialControllersSucceedsOnController(setKraftVersionFeature:
Boolean): Unit = {
+ @Test
+ def testFormatWithNoInitialControllersSucceedsOnController(): Unit = {
val availableDirs = Seq(TestUtils.tempDir())
val properties = new Properties()
properties.putAll(defaultDynamicQuorumProperties)
properties.setProperty("log.dirs", availableDirs.mkString(","))
val stream = new ByteArrayOutputStream()
val arguments = ListBuffer[String]("--release-version", "3.9-IV0",
"--no-initial-controllers")
- if (setKraftVersionFeature) {
- arguments += "--feature"
- arguments += "kraft.version=1"
- }
assertEquals(0, runFormatCommand(stream, properties, arguments.toSeq))
assertTrue(stream.toString().
contains("Formatting metadata directory %s".format(availableDirs.head)),
diff --git a/docs/ops.html b/docs/ops.html
index 803b429f18f..2d050ec76da 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -4099,14 +4099,8 @@ Feature: metadata.version SupportedMinVersion:
3.3-IV3 SupportedMaxVers
<p>
The static versus dynamic nature of the quorum is determined at the time
of formatting.
Specifically, the quorum will be formatted as dynamic if
<code>controller.quorum.voters</code> is
- <b>not</b> present, and if the software version is Apache Kafka 3.9 or
newer. If you have
- followed the instructions earlier in this document, you will get a dynamic
quorum.<p>
-
- If you would like the formatting process to fail if a dynamic quorum
cannot be achieved, format your
- controllers using the <code>--feature kraft.version=1</code>. (Note that
you should not supply
- this flag when formatting brokers -- only when formatting controllers.)<p>
-
- <pre><code class="language-bash">$ bin/kafka-storage.sh format -t
KAFKA_CLUSTER_ID --feature kraft.version=1 -c controller.properties</code></pre>
+ <b>not</b> present, and one of --standalone, --initial-controllers, or
--no-initial-controllers is set.
+ If you have followed the instructions earlier in this document, you will
get a dynamic quorum.
<p>
Note: To migrate from static voter set to dynamic voter set, please refer to
the <a href="#kraft_upgrade">Upgrade</a> section.
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
index daac6630550..a036192fabb 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
@@ -131,7 +131,7 @@ public class Formatter {
* The initial KIP-853 voters.
*/
private Optional<DynamicVoters> initialControllers = Optional.empty();
- private boolean noInitialControllersFlag = false;
+ private boolean hasDynamicQuorum = false;
public Formatter setPrintStream(PrintStream printStream) {
this.printStream = printStream;
@@ -217,8 +217,8 @@ public class Formatter {
return this;
}
- public Formatter setNoInitialControllersFlag(boolean
noInitialControllersFlag) {
- this.noInitialControllersFlag = noInitialControllersFlag;
+ public Formatter setHasDynamicQuorum(boolean hasDynamicQuorum) {
+ this.hasDynamicQuorum = hasDynamicQuorum;
return this;
}
@@ -227,7 +227,7 @@ public class Formatter {
}
boolean hasDynamicQuorum() {
- return initialControllers.isPresent() || noInitialControllersFlag;
+ return hasDynamicQuorum;
}
public BootstrapMetadata bootstrapMetadata() {
@@ -337,8 +337,8 @@ public class Formatter {
/**
* Calculate the effective feature level for kraft.version. In order to
keep existing
* command-line invocations of StorageTool working, we default this to 0
if no dynamic
- * voter quorum arguments were provided. As a convenience, if dynamic
voter quorum arguments
- * were passed, we set the latest kraft.version. (Currently there is only
1 non-zero version).
+ * voter quorum arguments were provided. As a convenience, if the static
voters config is
+ * empty, we set the latest kraft.version. (Currently there is only 1
non-zero version).
*
* @param configuredKRaftVersionLevel The configured level for
kraft.version
* @return The effective feature level.
@@ -348,20 +348,19 @@ public class Formatter {
if (configuredKRaftVersionLevel.get() == 0) {
if (hasDynamicQuorum()) {
throw new FormatterException(
- "Cannot set kraft.version to " +
- configuredKRaftVersionLevel.get() +
- " if one of the flags --standalone,
--initial-controllers, or --no-initial-controllers is used. " +
- "For dynamic controllers support, try removing the
--feature flag for kraft.version."
+ "Cannot set kraft.version to 0 if
controller.quorum.voters is empty and one of the flags " +
+ "--standalone, --initial-controllers, or
--no-initial-controllers is used. For dynamic " +
+ "controllers support, try removing the --feature flag
for kraft.version."
);
}
} else {
if (!hasDynamicQuorum()) {
throw new FormatterException(
- "Cannot set kraft.version to " +
- configuredKRaftVersionLevel.get() +
- " unless one of the flags --standalone,
--initial-controllers, or --no-initial-controllers is used. " +
- "For dynamic controllers support, try using one of
--standalone, --initial-controllers, or " +
- "--no-initial-controllers."
+ "Cannot set kraft.version to " +
configuredKRaftVersionLevel.get() +
+ " unless controller.quorum.voters is empty and one of
the flags --standalone, " +
+ "--initial-controllers, or --no-initial-controllers is
used. " +
+ "For dynamic controllers support, try using one of
--standalone, --initial-controllers, " +
+ "or --no-initial-controllers and removing
controller.quorum.voters."
);
}
}
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
index d4e13b4ccab..e57002abb8e 100644
---
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
+++
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.Feature;
import org.apache.kafka.server.common.GroupVersion;
+import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.TestFeatureVersion;
import org.apache.kafka.server.common.TransactionVersion;
@@ -200,6 +201,7 @@ public class FormatterTest {
String newDirectoryId = Uuid.randomUuid().toString();
formatter1.formatter
.setInitialControllers(DynamicVoters.parse("1@localhost:8020:"
+ originalDirectoryId))
+ .setHasDynamicQuorum(true)
.run();
assertEquals("Bootstrap metadata: " +
formatter1.formatter.bootstrapMetadata() +
"\nFormatting dynamic metadata voter directory " +
testEnv.directory(0) +
@@ -422,13 +424,14 @@ public class FormatterTest {
try (TestEnv testEnv = new TestEnv(2)) {
FormatterContext formatter1 = testEnv.newFormatter();
if (specifyKRaftVersion) {
- formatter1.formatter.setFeatureLevel("kraft.version", (short)
1);
+
formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME, (short) 1);
}
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
formatter1.formatter.setInitialControllers(DynamicVoters.
parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
+ formatter1.formatter.setHasDynamicQuorum(true);
formatter1.formatter.run();
- assertEquals((short) 1,
formatter1.formatter.featureLevels.getOrDefault("kraft.version", (short) 0));
+ assertEquals((short) 1,
formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME));
assertEquals(List.of(
"Bootstrap metadata: " +
formatter1.formatter.bootstrapMetadata(),
String.format("Formatting data directory %s with %s %s.",
@@ -456,49 +459,66 @@ public class FormatterTest {
public void testFormatWithInitialVotersFailsWithOlderKraftVersion() throws
Exception {
try (TestEnv testEnv = new TestEnv(2)) {
FormatterContext formatter1 = testEnv.newFormatter();
- formatter1.formatter.setFeatureLevel("kraft.version", (short) 0);
+ formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME,
(short) 0);
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
formatter1.formatter.setInitialControllers(DynamicVoters.
parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
+ formatter1.formatter.setHasDynamicQuorum(true);
assertTrue(formatter1.formatter.hasDynamicQuorum());
assertEquals(
- "Cannot set kraft.version to 0 if one of the flags
--standalone, --initial-controllers, or " +
- "--no-initial-controllers is used. For dynamic controllers
support, try removing the " +
- "--feature flag for kraft.version.",
+ "Cannot set kraft.version to 0 if controller.quorum.voters is
empty " +
+ "and one of the flags --standalone, --initial-controllers, or
--no-initial-controllers is used. " +
+ "For dynamic controllers support, try removing the --feature
flag for kraft.version.",
assertThrows(FormatterException.class,
formatter1.formatter::run).getMessage()
);
}
}
@Test
- public void testFormatWithoutInitialVotersFailsWithNewerKraftVersion()
throws Exception {
+ public void testFormatWithStaticQuorumFailsWithNewerKraftVersion() throws
Exception {
try (TestEnv testEnv = new TestEnv(2)) {
FormatterContext formatter1 = testEnv.newFormatter();
- formatter1.formatter.setFeatureLevel("kraft.version", (short) 1);
+ formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME,
(short) 1);
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
assertFalse(formatter1.formatter.hasDynamicQuorum());
assertEquals(
- "Cannot set kraft.version to 1 unless one of the flags
--standalone, --initial-controllers, or " +
- "--no-initial-controllers is used. For dynamic controllers
support, try using one of " +
- "--standalone, --initial-controllers, or
--no-initial-controllers.",
+ "Cannot set kraft.version to 1 unless controller.quorum.voters
is empty and " +
+ "one of the flags --standalone, --initial-controllers, or
--no-initial-controllers is used. " +
+ "For dynamic controllers support, try using one of
--standalone, --initial-controllers, " +
+ "or --no-initial-controllers and removing
controller.quorum.voters.",
assertThrows(FormatterException.class,
formatter1.formatter::run).getMessage()
);
}
}
@Test
- public void testFormatWithInitialVotersFailsWithOlderMetadataVersion()
throws Exception {
+ public void testFormatWithInitialVotersWithOlderMetadataVersion() throws
Exception {
try (TestEnv testEnv = new TestEnv(2)) {
FormatterContext formatter1 = testEnv.newFormatter();
formatter1.formatter.setReleaseVersion(MetadataVersion.IBP_3_8_IV0);
- formatter1.formatter.setFeatureLevel("kraft.version", (short) 1);
+ formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME,
(short) 1);
formatter1.formatter.setInitialControllers(DynamicVoters.
parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
- assertEquals("kraft.version could not be set to 1 because it
depends on " +
- "metadata.version level 21",
- assertThrows(IllegalArgumentException.class,
- formatter1.formatter::run).getMessage());
+ formatter1.formatter.setHasDynamicQuorum(true);
+ formatter1.formatter.run();
+ assertEquals((short) 1,
formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME));
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void
testFormatWithNoInitialControllersWithOlderMetadataVersion(boolean
hasDynamicQuorum) throws Exception {
+ try (TestEnv testEnv = new TestEnv(2)) {
+ FormatterContext formatter1 = testEnv.newFormatter();
+
formatter1.formatter.setReleaseVersion(MetadataVersion.IBP_3_8_IV0);
+ formatter1.formatter.setHasDynamicQuorum(hasDynamicQuorum);
+ formatter1.formatter.run();
+ if (hasDynamicQuorum) {
+ assertEquals((short) 1,
formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME));
+ } else {
+ assertEquals((short) 0,
formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME));
+ }
}
}
@@ -519,6 +539,7 @@ public class FormatterTest {
formatter1.formatter.setFeatureLevel(EligibleLeaderReplicasVersion.FEATURE_NAME,
(short) 1);
formatter1.formatter.setInitialControllers(DynamicVoters.
parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
+ formatter1.formatter.setHasDynamicQuorum(true);
if (metadataVersion.isAtLeast(MetadataVersion.IBP_4_0_IV1)) {
assertDoesNotThrow(formatter1.formatter::run);
} else {
@@ -530,20 +551,14 @@ public class FormatterTest {
}
}
- @ParameterizedTest
- @ValueSource(booleans = {false, true})
- public void testFormatWithNoInitialControllers(boolean
specifyKRaftVersion) throws Exception {
+ @Test
+ public void testFormatWithNoInitialControllers() throws Exception {
try (TestEnv testEnv = new TestEnv(2)) {
FormatterContext formatter1 = testEnv.newFormatter();
- if (specifyKRaftVersion) {
- formatter1.formatter.setFeatureLevel("kraft.version", (short)
1);
- }
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
- formatter1.formatter.setNoInitialControllersFlag(true);
- assertTrue(formatter1.formatter.hasDynamicQuorum());
-
+ assertFalse(formatter1.formatter.hasDynamicQuorum());
formatter1.formatter.run();
- assertEquals((short) 1,
formatter1.formatter.featureLevels.getOrDefault("kraft.version", (short) 0));
+ assertEquals((short) 0,
formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME));
assertEquals(List.of(
"Bootstrap metadata: " +
formatter1.formatter.bootstrapMetadata(),
String.format("Formatting data directory %s with %s %s.",
@@ -564,38 +579,4 @@ public class FormatterTest {
assertNotNull(logDirProps1);
}
}
-
- @Test
- public void
testFormatWithoutNoInitialControllersFailsWithNewerKraftVersion() throws
Exception {
- try (TestEnv testEnv = new TestEnv(2)) {
- FormatterContext formatter1 = testEnv.newFormatter();
- formatter1.formatter.setFeatureLevel("kraft.version", (short) 1);
- formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
- formatter1.formatter.setNoInitialControllersFlag(false);
- assertFalse(formatter1.formatter.hasDynamicQuorum());
- assertEquals(
- "Cannot set kraft.version to 1 unless one of the flags
--standalone, --initial-controllers, or " +
- "--no-initial-controllers is used. For dynamic controllers
support, try using one of " +
- "--standalone, --initial-controllers, or
--no-initial-controllers.",
- assertThrows(FormatterException.class,
formatter1.formatter::run).getMessage()
- );
- }
- }
-
- @Test
- public void testFormatWithNoInitialControllersFailsWithOlderKraftVersion()
throws Exception {
- try (TestEnv testEnv = new TestEnv(2)) {
- FormatterContext formatter1 = testEnv.newFormatter();
- formatter1.formatter.setFeatureLevel("kraft.version", (short) 0);
- formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
- formatter1.formatter.setNoInitialControllersFlag(true);
- assertTrue(formatter1.formatter.hasDynamicQuorum());
- assertEquals(
- "Cannot set kraft.version to 0 if one of the flags
--standalone, --initial-controllers, or " +
- "--no-initial-controllers is used. For dynamic controllers
support, try removing the " +
- "--feature flag for kraft.version.",
- assertThrows(FormatterException.class,
formatter1.formatter::run).getMessage()
- );
- }
- }
}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java
b/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java
index 463cc2a015c..d797880c776 100644
---
a/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java
+++
b/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java
@@ -72,12 +72,7 @@ public enum KRaftVersion implements FeatureVersion {
@Override
public Map<String, Short> dependencies() {
- if (this.featureLevel == 0) {
- return Map.of();
- } else {
- return Map.of(
- MetadataVersion.FEATURE_NAME,
MetadataVersion.IBP_3_9_IV0.featureLevel());
- }
+ return Map.of();
}
public boolean isAtLeast(KRaftVersion otherVersion) {
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
index 59041c7a66a..a8440ff32fd 100644
---
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
@@ -45,7 +45,6 @@ import org.apache.kafka.raft.DynamicVoters;
import org.apache.kafka.raft.MetadataLogConfig;
import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.config.KRaftConfigs;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.fault.FaultHandler;
@@ -182,18 +181,31 @@ public class KafkaClusterTestKit implements AutoCloseable
{
props.putIfAbsent(INTER_BROKER_LISTENER_NAME_CONFIG,
brokerListenerName);
props.putIfAbsent(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG,
controllerListenerName);
- StringBuilder quorumVoterStringBuilder = new StringBuilder();
- String prefix = "";
- for (int nodeId : nodes.controllerNodes().keySet()) {
- quorumVoterStringBuilder.append(prefix).
- append(nodeId).
- append("@").
- append("localhost").
- append(":").
-
append(socketFactoryManager.getOrCreatePortForListener(nodeId,
controllerListenerName));
- prefix = ",";
+ if (!standalone && initialVoterSet.isEmpty()) {
+ StringBuilder quorumVoterStringBuilder = new StringBuilder();
+ String prefix = "";
+ for (int nodeId : nodes.controllerNodes().keySet()) {
+ quorumVoterStringBuilder.append(prefix).
+ append(nodeId).
+ append("@").
+ append("localhost").
+ append(":").
+
append(socketFactoryManager.getOrCreatePortForListener(nodeId,
controllerListenerName));
+ prefix = ",";
+ }
+ props.put(QuorumConfig.QUORUM_VOTERS_CONFIG,
quorumVoterStringBuilder.toString());
+ } else {
+ StringBuilder bootstrapServersStringBuilder = new
StringBuilder();
+ String prefix = "";
+ for (int nodeId : nodes.controllerNodes().keySet()) {
+ bootstrapServersStringBuilder.append(prefix).
+ append("localhost").
+ append(":").
+
append(socketFactoryManager.getOrCreatePortForListener(nodeId,
controllerListenerName));
+ prefix = ",";
+ }
+ props.put(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG,
bootstrapServersStringBuilder.toString());
}
- props.put(QuorumConfig.QUORUM_VOTERS_CONFIG,
quorumVoterStringBuilder.toString());
// reduce log cleaner offset map memory usage
props.putIfAbsent(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152");
@@ -277,7 +289,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
Time.SYSTEM,
new Metrics(),
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig().voters())),
- List.of(),
+
QuorumConfig.parseBootstrapServers(config.quorumConfig().bootstrapServers()),
faultHandlerFactory,
socketFactoryManager.getOrCreateSocketFactory(node.id())
);
@@ -305,7 +317,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
Time.SYSTEM,
new Metrics(),
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig().voters())),
- List.of(),
+
QuorumConfig.parseBootstrapServers(config.quorumConfig().bootstrapServers()),
faultHandlerFactory,
socketFactoryManager.getOrCreateSocketFactory(node.id())
);
@@ -467,8 +479,6 @@ public class KafkaClusterTestKit implements AutoCloseable {
return;
}
formatter.setReleaseVersion(nodes.bootstrapMetadata().metadataVersion());
- formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME,
-
nodes.bootstrapMetadata().featureLevel(KRaftVersion.FEATURE_NAME));
formatter.setUnstableFeatureVersionsEnabled(true);
formatter.setIgnoreFormatted(false);
formatter.setControllerListenerName(controllerListenerName);
@@ -477,53 +487,43 @@ public class KafkaClusterTestKit implements AutoCloseable
{
} else {
formatter.setMetadataLogDirectory(Optional.empty());
}
- if
(nodes.bootstrapMetadata().featureLevel(KRaftVersion.FEATURE_NAME) > 0) {
- StringBuilder dynamicVotersBuilder = new StringBuilder();
- String prefix = "";
- if (standalone) {
- if (nodeId == TestKitDefaults.CONTROLLER_ID_OFFSET) {
- final var controllerNode =
nodes.controllerNodes().get(nodeId);
- dynamicVotersBuilder.append(
- String.format(
- "%d@localhost:%d:%s",
- controllerNode.id(),
- socketFactoryManager.
-
getOrCreatePortForListener(controllerNode.id(), controllerListenerName),
- controllerNode.metadataDirectoryId()
- )
- );
-
formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString()));
- } else {
- formatter.setNoInitialControllersFlag(true);
- }
- } else if (initialVoterSet.isPresent()) {
- for (final var controllerNode :
initialVoterSet.get().entrySet()) {
- final var voterId = controllerNode.getKey();
- final var voterDirectoryId = controllerNode.getValue();
- dynamicVotersBuilder.append(prefix);
- prefix = ",";
- dynamicVotersBuilder.append(
- String.format(
- "%d@localhost:%d:%s",
- voterId,
- socketFactoryManager.
- getOrCreatePortForListener(voterId,
controllerListenerName),
- voterDirectoryId
- )
- );
- }
-
formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString()));
- } else {
- for (TestKitNode controllerNode :
nodes.controllerNodes().values()) {
- int port = socketFactoryManager.
- getOrCreatePortForListener(controllerNode.id(),
controllerListenerName);
- dynamicVotersBuilder.append(prefix);
- prefix = ",";
-
dynamicVotersBuilder.append(String.format("%d@localhost:%d:%s",
- controllerNode.id(), port,
controllerNode.metadataDirectoryId()));
- }
+ StringBuilder dynamicVotersBuilder = new StringBuilder();
+ String prefix = "";
+ if (standalone) {
+ if (nodeId == TestKitDefaults.BROKER_ID_OFFSET +
TestKitDefaults.CONTROLLER_ID_OFFSET) {
+ final var controllerNode =
nodes.controllerNodes().get(nodeId);
+ dynamicVotersBuilder.append(
+ String.format(
+ "%d@localhost:%d:%s",
+ controllerNode.id(),
+ socketFactoryManager.
+
getOrCreatePortForListener(controllerNode.id(), controllerListenerName),
+ controllerNode.metadataDirectoryId()
+ )
+ );
formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString()));
}
+ // when the nodeId != TestKitDefaults.CONTROLLER_ID_OFFSET,
the node is formatting with
+ // the --no-initial-controllers flag
+ formatter.setHasDynamicQuorum(true);
+ } else if (initialVoterSet.isPresent()) {
+ for (final var controllerNode :
initialVoterSet.get().entrySet()) {
+ final var voterId = controllerNode.getKey();
+ final var voterDirectoryId = controllerNode.getValue();
+ dynamicVotersBuilder.append(prefix);
+ prefix = ",";
+ dynamicVotersBuilder.append(
+ String.format(
+ "%d@localhost:%d:%s",
+ voterId,
+ socketFactoryManager.
+ getOrCreatePortForListener(voterId,
controllerListenerName),
+ voterDirectoryId
+ )
+ );
+ }
+
formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString()));
+ formatter.setHasDynamicQuorum(true);
}
formatter.run();
} catch (Exception e) {
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
index a9667dbd631..3622430f487 100644
---
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
@@ -93,11 +93,6 @@ public class TestKitNodes {
return this;
}
- public Builder setFeature(String featureName, short level) {
- this.bootstrapMetadata =
bootstrapMetadata.copyWithFeatureRecord(featureName, level);
- return this;
- }
-
public Builder setCombined(boolean combined) {
this.combined = combined;
return this;