This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ac63ce9789a KAFKA-19544 Improve `MetadataVersion.fromVersionString()`
to take an enableUnstableFeature flag (#20248)
ac63ce9789a is described below
commit ac63ce9789a7d7d283958ed6f01af996fcd85159
Author: Lan Ding <[email protected]>
AuthorDate: Thu Sep 25 01:06:54 2025 +0800
KAFKA-19544 Improve `MetadataVersion.fromVersionString()` to take an
enableUnstableFeature flag (#20248)
Improve `MetadataVersion.fromVersionString()` to take an
`enableUnstableFeature` flag, and enable `FeatureCommand` and
`StorageTool` to leverage the exception message from
`fromVersionString`.
Reviewers: Chia-Ping Tsai <[email protected]>
---
core/src/main/scala/kafka/tools/StorageTool.scala | 22 +++---
.../kafka/server/KRaftClusterTest.scala | 2 +-
.../scala/unit/kafka/tools/StorageToolTest.scala | 14 ++--
.../controller/PartitionChangeBuilderTest.java | 10 +--
.../kafka/server/common/MetadataVersion.java | 27 ++++++--
.../kafka/server/common/MetadataVersionTest.java | 79 +++++++++++++---------
.../org/apache/kafka/tools/FeatureCommand.java | 26 ++-----
.../org/apache/kafka/tools/FeatureCommandTest.java | 20 ++----
8 files changed, 101 insertions(+), 99 deletions(-)
diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala
b/core/src/main/scala/kafka/tools/StorageTool.scala
index 63993ed5ea9..c342ddfe071 100644
--- a/core/src/main/scala/kafka/tools/StorageTool.scala
+++ b/core/src/main/scala/kafka/tools/StorageTool.scala
@@ -129,17 +129,12 @@ object StorageTool extends Logging {
setControllerListenerName(config.controllerListenerNames.get(0)).
setMetadataLogDirectory(config.metadataLogDir)
- def metadataVersionsToString(first: MetadataVersion, last:
MetadataVersion): String = {
- val versions = MetadataVersion.VERSIONS.slice(first.ordinal,
last.ordinal + 1)
- versions.map(_.toString).mkString(", ")
- }
Option(namespace.getString("release_version")).foreach(releaseVersion => {
try {
-
formatter.setReleaseVersion(MetadataVersion.fromVersionString(releaseVersion))
+
formatter.setReleaseVersion(MetadataVersion.fromVersionString(releaseVersion,
config.unstableFeatureVersionsEnabled))
} catch {
- case _: Throwable =>
- throw new TerseFailure(s"Unknown metadata.version $releaseVersion.
Supported metadata.version are " +
- s"${metadataVersionsToString(MetadataVersion.MINIMUM_VERSION,
MetadataVersion.latestProduction())}")
+ case e: Throwable =>
+ throw new TerseFailure(e.getMessage)
}
})
@@ -184,9 +179,9 @@ object StorageTool extends Logging {
* Maps the given release version to the corresponding metadata version
* and prints the corresponding features.
*
- * @param namespace Arguments containing the release version.
- * @param printStream The print stream to output the version mapping.
- * @param validFeatures List of features to be considered in the output
+ * @param namespace Arguments containing the release
version.
+ * @param printStream The print stream to output the
version mapping.
+ * @param validFeatures List of features to be considered in
the output.
*/
def runVersionMappingCommand(
namespace: Namespace,
@@ -195,7 +190,7 @@ object StorageTool extends Logging {
): Unit = {
val releaseVersion =
Option(namespace.getString("release_version")).getOrElse(MetadataVersion.LATEST_PRODUCTION.toString)
try {
- val metadataVersion = MetadataVersion.fromVersionString(releaseVersion)
+ val metadataVersion = MetadataVersion.fromVersionString(releaseVersion,
true)
val metadataVersionLevel = metadataVersion.featureLevel()
printStream.print(f"metadata.version=$metadataVersionLevel%d
($releaseVersion%s)%n")
@@ -206,8 +201,7 @@ object StorageTool extends Logging {
}
} catch {
case e: IllegalArgumentException =>
- throw new TerseFailure(s"Unknown release version '$releaseVersion'.
Supported versions are: " +
- s"${MetadataVersion.MINIMUM_VERSION.version} to
${MetadataVersion.latestTesting().version()}")
+ throw new TerseFailure(e.getMessage)
}
}
diff --git
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index dfc999d7c64..27b06daed21 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -807,7 +807,7 @@ class KRaftClusterTest {
val cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setNumBrokerNodes(4).
-
setBootstrapMetadataVersion(MetadataVersion.fromVersionString(metadataVersionString)).
+
setBootstrapMetadataVersion(MetadataVersion.fromVersionString(metadataVersionString,
true)).
setNumControllerNodes(3).build()).
build()
try {
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index 344df08f56b..1e938ea9cd8 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -315,7 +315,7 @@ Found problem:
val stream = new ByteArrayOutputStream()
val failure = assertThrows(classOf[TerseFailure], () =>
runFormatCommand(stream, properties, Seq("--release-version",
"3.3-IV1"))).getMessage
- assertTrue(failure.contains("Unknown metadata.version 3.3-IV1"))
+ assertTrue(failure.contains("Unknown metadata.version '3.3-IV1'"))
assertTrue(failure.contains(MetadataVersion.MINIMUM_VERSION.version))
assertTrue(failure.contains(MetadataVersion.latestProduction().version))
}
@@ -735,18 +735,18 @@ Found problem:
runVersionMappingCommand(stream, "2.9-IV2")
})
- assertEquals("Unknown release version '2.9-IV2'." +
- " Supported versions are: " + MetadataVersion.MINIMUM_VERSION.version +
- " to " + MetadataVersion.latestTesting().version, exception.getMessage
+ assertEquals("Unknown metadata.version '2.9-IV2'. Supported
metadata.version are: " +
+
MetadataVersion.metadataVersionsToString(MetadataVersion.MINIMUM_VERSION,
MetadataVersion.latestTesting()),
+ exception.getMessage
)
val exception2 = assertThrows(classOf[TerseFailure], () => {
runVersionMappingCommand(stream, "invalid")
})
- assertEquals("Unknown release version 'invalid'." +
- " Supported versions are: " + MetadataVersion.MINIMUM_VERSION.version +
- " to " + MetadataVersion.latestTesting().version, exception2.getMessage
+ assertEquals("Unknown metadata.version 'invalid'. Supported
metadata.version are: " +
+
MetadataVersion.metadataVersionsToString(MetadataVersion.MINIMUM_VERSION,
MetadataVersion.latestTesting()),
+ exception2.getMessage
)
}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
index bb5f5b9216d..312a207f8d7 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
@@ -310,7 +310,7 @@ public class PartitionChangeBuilderTest {
@ParameterizedTest
@ValueSource(strings = {"3.6-IV0", "3.7-IV2", "4.0-IV0"})
public void testNoLeaderEpochBumpOnIsrShrink(String metadataVersionString)
{
- MetadataVersion metadataVersion =
MetadataVersion.fromVersionString(metadataVersionString);
+ MetadataVersion metadataVersion =
MetadataVersion.fromVersionString(metadataVersionString, true);
testTriggerLeaderEpochBumpIfNeeded(
createFooBuilder(metadataVersion).setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(List.of(2, 1))),
@@ -325,7 +325,7 @@ public class PartitionChangeBuilderTest {
@ParameterizedTest
@ValueSource(strings = {"3.4-IV0", "3.5-IV2"})
public void testLeaderEpochBumpOnIsrShrink(String metadataVersionString) {
- MetadataVersion metadataVersion =
MetadataVersion.fromVersionString(metadataVersionString);
+ MetadataVersion metadataVersion =
MetadataVersion.fromVersionString(metadataVersionString, true);
testTriggerLeaderEpochBumpIfNeeded(
createFooBuilder(metadataVersion).setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(List.of(2, 1))),
@@ -339,7 +339,7 @@ public class PartitionChangeBuilderTest {
@ParameterizedTest
@ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV2",
"4.0-IV0"})
public void testNoLeaderEpochBumpOnIsrExpansion(String
metadataVersionString) {
- MetadataVersion metadataVersion =
MetadataVersion.fromVersionString(metadataVersionString);
+ MetadataVersion metadataVersion =
MetadataVersion.fromVersionString(metadataVersionString, true);
testTriggerLeaderEpochBumpIfNeeded(
createFooBuilder(metadataVersion).setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(List.of(2, 1, 3,
4))),
@@ -354,7 +354,7 @@ public class PartitionChangeBuilderTest {
@ParameterizedTest
@ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV2",
"4.0-IV0"})
public void testLeaderEpochBumpOnNewReplicaSetDisjoint(String
metadataVersionString) {
- MetadataVersion metadataVersion =
MetadataVersion.fromVersionString(metadataVersionString);
+ MetadataVersion metadataVersion =
MetadataVersion.fromVersionString(metadataVersionString, true);
testTriggerLeaderEpochBumpIfNeeded(
createFooBuilder(metadataVersion).setTargetReplicas(List.of(2, 1,
4)),
new PartitionChangeRecord(),
@@ -368,7 +368,7 @@ public class PartitionChangeBuilderTest {
@ParameterizedTest
@ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV2"})
public void testNoLeaderEpochBumpOnEmptyTargetIsr(String
metadataVersionString) {
- MetadataVersion metadataVersion =
MetadataVersion.fromVersionString(metadataVersionString);
+ MetadataVersion metadataVersion =
MetadataVersion.fromVersionString(metadataVersionString, true);
PartitionRegistration partition = new PartitionRegistration.Builder().
setReplicas(new int[] {2}).
setDirectories(new Uuid[]{
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index ceca9a6a7de..940f58e26c6 100644
---
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -19,9 +19,11 @@ package org.apache.kafka.server.common;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
/**
* This class contains the different Kafka versions.
@@ -340,11 +342,12 @@ public enum MetadataVersion {
/**
* Return an `MetadataVersion` instance for `versionString`, which can be
in a variety of formats (e.g. "3.8", "3.8.x",
- * "3.8.0", "3.8-IV0"). `IllegalArgumentException` is thrown if
`versionString` cannot be mapped to an `MetadataVersion`.
+ * "3.8.0", "3.8-IV0"). The `unstableFeatureVersionsEnabled` parameter
determines whether unstable versions are permitted.
+ * `IllegalArgumentException` is thrown if `versionString` cannot be
mapped to an `MetadataVersion`.
* Note that 'misconfigured' values such as "3.8.1" will be parsed to
`IBP_3_8_IV0` as we ignore anything after the first
* two segments.
*/
- public static MetadataVersion fromVersionString(String versionString) {
+ public static MetadataVersion fromVersionString(String versionString,
boolean unstableFeatureVersionsEnabled) {
String[] versionSegments = versionString.split(Pattern.quote("."));
int numSegments = 2;
String key;
@@ -353,10 +356,22 @@ public enum MetadataVersion {
} else {
key = String.join(".", Arrays.copyOfRange(versionSegments, 0,
numSegments));
}
- return Optional.ofNullable(IBP_VERSIONS.get(key)).orElseThrow(() ->
- new IllegalArgumentException("Version " + versionString + " is not
a valid version. The minimum version is " + MINIMUM_VERSION
- + " and the maximum version is " + latestTesting())
- );
+
+ MetadataVersion metadataVersion = IBP_VERSIONS.get(key);
+ if (metadataVersion == null || (!unstableFeatureVersionsEnabled &&
!metadataVersion.isProduction())) {
+ String errorMsg = "Unknown metadata.version '" + versionString +
"'. Supported metadata.version are: "
+ + metadataVersionsToString(MetadataVersion.MINIMUM_VERSION,
+ unstableFeatureVersionsEnabled ?
MetadataVersion.latestTesting() : MetadataVersion.latestProduction());
+ throw new IllegalArgumentException(errorMsg);
+ }
+ return metadataVersion;
+ }
+
+ public static String metadataVersionsToString(MetadataVersion first,
MetadataVersion last) {
+ List<MetadataVersion> versions =
List.of(MetadataVersion.VERSIONS).subList(first.ordinal(), last.ordinal() + 1);
+ return versions.stream()
+ .map(String::valueOf)
+ .collect(Collectors.joining(", "));
}
public static MetadataVersion fromFeatureLevel(short version) {
diff --git
a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
index 49a200f6225..136fdeaa4ec 100644
---
a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
+++
b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
@@ -26,6 +26,7 @@ import org.junit.jupiter.params.provider.EnumSource;
import static org.apache.kafka.server.common.MetadataVersion.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
class MetadataVersionTest {
@@ -42,55 +43,69 @@ class MetadataVersionTest {
@SuppressWarnings("checkstyle:JavaNCSS")
public void testFromVersionString() {
// 3.3-IV3 is the latest production version in the 3.3 line
- assertEquals(IBP_3_3_IV3, MetadataVersion.fromVersionString("3.3"));
- assertEquals(IBP_3_3_IV3,
MetadataVersion.fromVersionString("3.3-IV3"));
+ assertEquals(IBP_3_3_IV3, MetadataVersion.fromVersionString("3.3",
true));
+ assertEquals(IBP_3_3_IV3, MetadataVersion.fromVersionString("3.3-IV3",
true));
// 3.4-IV0 is the latest production version in the 3.4 line
- assertEquals(IBP_3_4_IV0, MetadataVersion.fromVersionString("3.4"));
- assertEquals(IBP_3_4_IV0,
MetadataVersion.fromVersionString("3.4-IV0"));
+ assertEquals(IBP_3_4_IV0, MetadataVersion.fromVersionString("3.4",
true));
+ assertEquals(IBP_3_4_IV0, MetadataVersion.fromVersionString("3.4-IV0",
true));
// 3.5-IV2 is the latest production version in the 3.5 line
- assertEquals(IBP_3_5_IV2, MetadataVersion.fromVersionString("3.5"));
- assertEquals(IBP_3_5_IV0,
MetadataVersion.fromVersionString("3.5-IV0"));
- assertEquals(IBP_3_5_IV1,
MetadataVersion.fromVersionString("3.5-IV1"));
- assertEquals(IBP_3_5_IV2,
MetadataVersion.fromVersionString("3.5-IV2"));
+ assertEquals(IBP_3_5_IV2, MetadataVersion.fromVersionString("3.5",
true));
+ assertEquals(IBP_3_5_IV0, MetadataVersion.fromVersionString("3.5-IV0",
true));
+ assertEquals(IBP_3_5_IV1, MetadataVersion.fromVersionString("3.5-IV1",
true));
+ assertEquals(IBP_3_5_IV2, MetadataVersion.fromVersionString("3.5-IV2",
true));
// 3.6-IV2 is the latest production version in the 3.6 line
- assertEquals(IBP_3_6_IV2, MetadataVersion.fromVersionString("3.6"));
- assertEquals(IBP_3_6_IV0,
MetadataVersion.fromVersionString("3.6-IV0"));
- assertEquals(IBP_3_6_IV1,
MetadataVersion.fromVersionString("3.6-IV1"));
- assertEquals(IBP_3_6_IV2,
MetadataVersion.fromVersionString("3.6-IV2"));
+ assertEquals(IBP_3_6_IV2, MetadataVersion.fromVersionString("3.6",
true));
+ assertEquals(IBP_3_6_IV0, MetadataVersion.fromVersionString("3.6-IV0",
true));
+ assertEquals(IBP_3_6_IV1, MetadataVersion.fromVersionString("3.6-IV1",
true));
+ assertEquals(IBP_3_6_IV2, MetadataVersion.fromVersionString("3.6-IV2",
true));
// 3.7-IV4 is the latest production version in the 3.7 line
- assertEquals(IBP_3_7_IV4, MetadataVersion.fromVersionString("3.7"));
- assertEquals(IBP_3_7_IV0,
MetadataVersion.fromVersionString("3.7-IV0"));
- assertEquals(IBP_3_7_IV1,
MetadataVersion.fromVersionString("3.7-IV1"));
- assertEquals(IBP_3_7_IV2,
MetadataVersion.fromVersionString("3.7-IV2"));
- assertEquals(IBP_3_7_IV3,
MetadataVersion.fromVersionString("3.7-IV3"));
- assertEquals(IBP_3_7_IV4,
MetadataVersion.fromVersionString("3.7-IV4"));
+ assertEquals(IBP_3_7_IV4, MetadataVersion.fromVersionString("3.7",
true));
+ assertEquals(IBP_3_7_IV0, MetadataVersion.fromVersionString("3.7-IV0",
true));
+ assertEquals(IBP_3_7_IV1, MetadataVersion.fromVersionString("3.7-IV1",
true));
+ assertEquals(IBP_3_7_IV2, MetadataVersion.fromVersionString("3.7-IV2",
true));
+ assertEquals(IBP_3_7_IV3, MetadataVersion.fromVersionString("3.7-IV3",
true));
+ assertEquals(IBP_3_7_IV4, MetadataVersion.fromVersionString("3.7-IV4",
true));
// 3.8-IV0 is the latest production version in the 3.8 line
- assertEquals(IBP_3_8_IV0, MetadataVersion.fromVersionString("3.8"));
- assertEquals(IBP_3_8_IV0,
MetadataVersion.fromVersionString("3.8-IV0"));
+ assertEquals(IBP_3_8_IV0, MetadataVersion.fromVersionString("3.8",
true));
+ assertEquals(IBP_3_8_IV0, MetadataVersion.fromVersionString("3.8-IV0",
true));
// 3.9-IV0 is the latest production version in the 3.9 line
- assertEquals(IBP_3_9_IV0, MetadataVersion.fromVersionString("3.9"));
- assertEquals(IBP_3_9_IV0,
MetadataVersion.fromVersionString("3.9-IV0"));
+ assertEquals(IBP_3_9_IV0, MetadataVersion.fromVersionString("3.9",
true));
+ assertEquals(IBP_3_9_IV0, MetadataVersion.fromVersionString("3.9-IV0",
true));
// 4.0-IV3 is the latest production version in the 4.0 line
- assertEquals(IBP_4_0_IV3, MetadataVersion.fromVersionString("4.0"));
- assertEquals(IBP_4_0_IV0,
MetadataVersion.fromVersionString("4.0-IV0"));
- assertEquals(IBP_4_0_IV1,
MetadataVersion.fromVersionString("4.0-IV1"));
- assertEquals(IBP_4_0_IV2,
MetadataVersion.fromVersionString("4.0-IV2"));
- assertEquals(IBP_4_0_IV3,
MetadataVersion.fromVersionString("4.0-IV3"));
+ assertEquals(IBP_4_0_IV3, MetadataVersion.fromVersionString("4.0",
true));
+ assertEquals(IBP_4_0_IV0, MetadataVersion.fromVersionString("4.0-IV0",
true));
+ assertEquals(IBP_4_0_IV1, MetadataVersion.fromVersionString("4.0-IV1",
true));
+ assertEquals(IBP_4_0_IV2, MetadataVersion.fromVersionString("4.0-IV2",
true));
+ assertEquals(IBP_4_0_IV3, MetadataVersion.fromVersionString("4.0-IV3",
true));
// 4.1-IV1 is the latest production version in the 4.1 line
- assertEquals(IBP_4_1_IV1, MetadataVersion.fromVersionString("4.1"));
- assertEquals(IBP_4_1_IV0,
MetadataVersion.fromVersionString("4.1-IV0"));
- assertEquals(IBP_4_1_IV1,
MetadataVersion.fromVersionString("4.1-IV1"));
+ assertEquals(IBP_4_1_IV1, MetadataVersion.fromVersionString("4.1",
true));
+ assertEquals(IBP_4_1_IV0, MetadataVersion.fromVersionString("4.1-IV0",
true));
+ assertEquals(IBP_4_1_IV1, MetadataVersion.fromVersionString("4.1-IV1",
true));
+
+ assertEquals(IBP_4_2_IV0, MetadataVersion.fromVersionString("4.2-IV0",
true));
+ assertEquals(IBP_4_2_IV1, MetadataVersion.fromVersionString("4.2-IV1",
true));
+
+ // Throws exception when unstableFeatureVersionsEnabled is false
+ assertEquals("Unknown metadata.version '4.2-IV0'. Supported
metadata.version are: 3.3-IV3, 3.4-IV0, 3.5-IV0, 3.5-IV1, 3.5-IV2, "
+ + "3.6-IV0, 3.6-IV1, 3.6-IV2, 3.7-IV0, 3.7-IV1, 3.7-IV2, 3.7-IV3,
3.7-IV4, 3.8-IV0, 3.9-IV0, 4.0-IV0, 4.0-IV1, 4.0-IV2, 4.0-IV3, 4.1-IV0,
4.1-IV1",
+ assertThrows(IllegalArgumentException.class, () ->
fromVersionString("4.2-IV0", false)).getMessage());
+ assertEquals("Unknown metadata.version '4.2-IV1'. Supported
metadata.version are: 3.3-IV3, 3.4-IV0, 3.5-IV0, 3.5-IV1, 3.5-IV2, "
+ + "3.6-IV0, 3.6-IV1, 3.6-IV2, 3.7-IV0, 3.7-IV1, 3.7-IV2,
3.7-IV3, 3.7-IV4, 3.8-IV0, 3.9-IV0, 4.0-IV0, 4.0-IV1, 4.0-IV2, 4.0-IV3,
4.1-IV0, 4.1-IV1",
+ assertThrows(IllegalArgumentException.class, () ->
fromVersionString("4.2-IV1", false)).getMessage());
+ }
- assertEquals(IBP_4_2_IV0,
MetadataVersion.fromVersionString("4.2-IV0"));
- assertEquals(IBP_4_2_IV1,
MetadataVersion.fromVersionString("4.2-IV1"));
+ @Test
+ public void testMetadataVersionsToString() {
+ assertEquals("3.5-IV0, 3.5-IV1, 3.5-IV2, 3.6-IV0",
+
MetadataVersion.metadataVersionsToString(MetadataVersion.IBP_3_5_IV0,
MetadataVersion.IBP_3_6_IV0));
}
@Test
diff --git a/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java
b/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java
index 103821cf21d..87e4c228baf 100644
--- a/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java
@@ -46,7 +46,6 @@ import java.util.Optional;
import java.util.Properties;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
import static net.sourceforge.argparse4j.impl.Arguments.append;
import static net.sourceforge.argparse4j.impl.Arguments.store;
@@ -238,13 +237,6 @@ public class FeatureCommand {
});
}
- static String metadataVersionsToString(MetadataVersion first,
MetadataVersion last) {
- List<MetadataVersion> versions =
List.of(MetadataVersion.VERSIONS).subList(first.ordinal(), last.ordinal() + 1);
- return versions.stream()
- .map(String::valueOf)
- .collect(Collectors.joining(", "));
- }
-
static void handleUpgrade(Namespace namespace, Admin adminClient) throws
TerseException {
handleUpgradeOrDowngrade("upgrade", namespace, adminClient,
FeatureUpdate.UpgradeType.UPGRADE);
}
@@ -292,12 +284,10 @@ public class FeatureCommand {
if (releaseVersion != null) {
try {
- metadataVersion =
MetadataVersion.fromVersionString(releaseVersion);
+ metadataVersion =
MetadataVersion.fromVersionString(releaseVersion, true);
updates.put(metadataVersion.featureName(), new
FeatureUpdate(metadataVersion.featureLevel(), upgradeType));
} catch (Throwable e) {
- throw new TerseException("Unknown metadata.version " +
releaseVersion +
- ". Supported metadata.version are " +
metadataVersionsToString(
- MetadataVersion.MINIMUM_VERSION,
MetadataVersion.latestProduction()));
+ throw new TerseException(e.getMessage());
}
try {
for (Feature feature : Feature.PRODUCTION_FEATURES) {
@@ -315,11 +305,9 @@ public class FeatureCommand {
if (metadata != null) {
System.out.println(" `metadata` flag is deprecated and may be
removed in a future release.");
try {
- metadataVersion =
MetadataVersion.fromVersionString(metadata);
+ metadataVersion =
MetadataVersion.fromVersionString(metadata, true);
} catch (Throwable e) {
- throw new TerseException("Unknown metadata.version " +
metadata +
- ". Supported metadata.version are " +
metadataVersionsToString(
- MetadataVersion.MINIMUM_VERSION,
MetadataVersion.latestProduction()));
+ throw new TerseException(e.getMessage());
}
updates.put(MetadataVersion.FEATURE_NAME, new
FeatureUpdate(metadataVersion.featureLevel(), upgradeType));
}
@@ -361,7 +349,7 @@ public class FeatureCommand {
.orElseGet(() -> MetadataVersion.latestProduction().version());
try {
- MetadataVersion version =
MetadataVersion.fromVersionString(releaseVersion);
+ MetadataVersion version =
MetadataVersion.fromVersionString(releaseVersion, true);
short metadataVersionLevel = version.featureLevel();
System.out.printf("metadata.version=%d (%s)%n",
metadataVersionLevel, releaseVersion);
@@ -371,9 +359,7 @@ public class FeatureCommand {
System.out.printf("%s=%d%n", feature.featureName(),
featureLevel);
}
} catch (IllegalArgumentException e) {
- throw new TerseException("Unknown release version '" +
releaseVersion + "'." +
- " Supported versions are: " + MetadataVersion.MINIMUM_VERSION +
- " to " + MetadataVersion.latestTesting().version());
+ throw new TerseException(e.getMessage());
}
}
diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
index 19cd8bf2a37..2caaf8a2918 100644
--- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
@@ -210,12 +210,6 @@ public class FeatureCommandTest {
FeatureCommand.levelToString(MetadataVersion.FEATURE_NAME,
MetadataVersion.IBP_3_9_IV0.featureLevel()));
}
- @Test
- public void testMetadataVersionsToString() {
- assertEquals("3.5-IV0, 3.5-IV1, 3.5-IV2, 3.6-IV0",
-
FeatureCommand.metadataVersionsToString(MetadataVersion.IBP_3_5_IV0,
MetadataVersion.IBP_3_6_IV0));
- }
-
@Test
public void testDowngradeType() {
assertEquals(SAFE_DOWNGRADE, FeatureCommand.downgradeType(
@@ -274,7 +268,7 @@ public class FeatureCommandTest {
namespace.put("feature", List.of("foo.bar=6"));
namespace.put("dry_run", false);
Throwable t = assertThrows(TerseException.class, () ->
FeatureCommand.handleUpgrade(new Namespace(namespace), buildAdminClient()));
- assertTrue(t.getMessage().contains("Unknown metadata.version
3.3-IV1"));
+ assertTrue(t.getMessage().contains("Unknown metadata.version
'3.3-IV1'"));
}
@Test
@@ -371,7 +365,7 @@ public class FeatureCommandTest {
namespace.put("release_version", "foo");
ToolsTestUtils.captureStandardOut(() -> {
Throwable t = assertThrows(TerseException.class, () ->
FeatureCommand.handleUpgrade(new Namespace(namespace), buildAdminClient()));
- assertTrue(t.getMessage().contains("Unknown metadata.version
foo."));
+ assertTrue(t.getMessage().contains("Unknown metadata.version
'foo'."));
});
}
@@ -452,9 +446,8 @@ public class FeatureCommandTest {
FeatureCommand.handleVersionMapping(new Namespace(namespace),
testingFeatures)
);
- assertEquals("Unknown release version '2.9-IV2'." +
- " Supported versions are: " + MetadataVersion.MINIMUM_VERSION +
- " to " + MetadataVersion.latestTesting().version(),
exception1.getMessage());
+ assertEquals("Unknown metadata.version '2.9-IV2'. Supported
metadata.version are: " + MetadataVersion.metadataVersionsToString(
+ MetadataVersion.MINIMUM_VERSION,
MetadataVersion.latestTesting()), exception1.getMessage());
namespace.put("release_version", "invalid");
@@ -462,9 +455,8 @@ public class FeatureCommandTest {
FeatureCommand.handleVersionMapping(new Namespace(namespace),
testingFeatures)
);
- assertEquals("Unknown release version 'invalid'." +
- " Supported versions are: " + MetadataVersion.MINIMUM_VERSION +
- " to " + MetadataVersion.latestTesting().version(),
exception2.getMessage());
+ assertEquals("Unknown metadata.version 'invalid'. Supported
metadata.version are: " + MetadataVersion.metadataVersionsToString(
+ MetadataVersion.MINIMUM_VERSION, MetadataVersion.latestTesting()),
exception2.getMessage());
}
@Test