junrao commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1602305075
########## server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java: ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Collections; +import java.util.Map; + +public enum TestFeatureVersion implements FeatureVersionUtils.FeatureVersionImpl { + + TEST_0(0, MetadataVersion.IBP_3_3_IV0, Collections.emptyMap()), + TEST_1(1, MetadataVersion.IBP_3_7_IV0, Collections.emptyMap()), + TEST_2(2, MetadataVersion.IBP_3_8_IV0, Collections.emptyMap()); + + private short featureLevel; + private MetadataVersion metadataVersionMapping; + private Map<String, Short> dependencies; Review Comment: Should we make those instance vals final? ########## core/src/main/scala/kafka/tools/StorageTool.scala: ########## @@ -156,16 +200,27 @@ object StorageTool extends Logging { def getMetadataVersion( namespace: Namespace, + featureNamesAndLevelsMap: Map[String, java.lang.Short], defaultVersionString: Option[String] ): MetadataVersion = { val defaultValue = defaultVersionString match { case Some(versionString) => MetadataVersion.fromVersionString(versionString) case None => MetadataVersion.LATEST_PRODUCTION } - Option(namespace.getString("release_version")) - .map(ver => MetadataVersion.fromVersionString(ver)) - .getOrElse(defaultValue) + val releaseVersionTag = Option(namespace.getString("release_version")) + val featureTag = featureNamesAndLevelsMap.get(MetadataVersion.FEATURE_NAME) + + (releaseVersionTag, featureTag) match { + case (Some(_), Some(_)) => + throw new IllegalArgumentException("Both --release_version and --feature were set. Only one of the two flags can be set.") Review Comment: We should disallow the case where both --release_version and --feature are used, but --feature doesn't include metadata, right? ########## core/src/main/scala/kafka/tools/StorageTool.scala: ########## @@ -141,6 +189,9 @@ object StorageTool extends Logging { formatParser.addArgument("--release-version", "-r"). action(store()). help(s"A KRaft release version to use for the initial metadata.version. The minimum is ${MetadataVersion.IBP_3_0_IV0}, the default is ${MetadataVersion.LATEST_PRODUCTION}") + formatParser.addArgument("--feature"). + help("A feature upgrade we should perform, in feature=level format. For example: `metadata.version=5`."). Review Comment: Should we add a shorthand for --feature like other options? ########## core/src/main/scala/kafka/tools/StorageTool.scala: ########## @@ -109,6 +105,51 @@ object StorageTool extends Logging { } } + private def validateMetadataVersion(metadataVersion: MetadataVersion, config: Option[KafkaConfig]): Unit = { + if (!metadataVersion.isKRaftSupported) { + throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") + } + if (!metadataVersion.isProduction) { + if (config.get.unstableMetadataVersionsEnabled) { + System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") + } else { + throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") + } + } + } + + private[tools] def generateFeatureRecords(metadataRecords: ArrayBuffer[ApiMessageAndVersion], + metadataVersion: MetadataVersion, + specifiedFeatures: Map[String, java.lang.Short], + allFeatures: List[Features]): Unit = { + // If we are using --version-default, the default is based on the metadata version. Review Comment: Hmm, does the storage tool support the --version-default option? ########## server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java: ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME; + +public final class FinalizedFeatures { + private final MetadataVersion version; Review Comment: version => metadataVersion ? ########## core/src/main/scala/kafka/server/BrokerFeatures.scala: ########## @@ -75,16 +76,19 @@ object BrokerFeatures extends Logging { } def defaultSupportedFeatures(unstableMetadataVersionsEnabled: Boolean): Features[SupportedVersionRange] = { - Features.supportedFeatures( - java.util.Collections.singletonMap(MetadataVersion.FEATURE_NAME, + val features = new util.HashMap[String, SupportedVersionRange]() + features.put(MetadataVersion.FEATURE_NAME, new SupportedVersionRange( MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), if (unstableMetadataVersionsEnabled) { MetadataVersion.latestTesting.featureLevel } else { MetadataVersion.latestProduction.featureLevel - } - ))) + })) + FeatureVersion.PRODUCTION_FEATURES.forEach { feature => Review Comment: The KIP says to generalize unstableMetadataVersionsEnabled to unstableFeatureVersionsEnabled. Do we plan to add it in this PR? ########## core/src/main/scala/kafka/server/ApiVersionManager.scala: ########## @@ -80,14 +80,14 @@ class SimpleApiVersionManager( brokerFeatures: org.apache.kafka.common.feature.Features[SupportedVersionRange], val enableUnstableLastVersion: Boolean, val zkMigrationEnabled: Boolean, - val featuresProvider: () => Features + val featuresProvider: () => FinalizedFeatures Review Comment: This is an existing issue. Could we add the javadoc for the missing param? ########## server-common/src/main/java/org/apache/kafka/server/common/Features.java: ########## @@ -16,72 +16,143 @@ */ package org.apache.kafka.server.common; -import java.util.Collections; -import java.util.HashMap; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; import java.util.Map; -import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; -import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME; +/** + * This is enum for the various features implemented for Kafka clusters. + * KIP-584: Versioning Scheme for Features introduced the idea of various features, but only added one feature -- MetadataVersion. + * KIP-1022: Formatting and Updating Features allowed for more features to be added. In order to set and update features, + * they need to be specified via the StorageTool or FeatureCommand tools. + * <br> + * Having a unified enum for the features that will use a shared type in the API used to set and update them + * makes it easier to process these features. + */ +public enum Features { + + /** + * Features defined. If a feature is included in this list, and marked to be used in production they will also be specified when + * formatting a cluster via the StorageTool. MetadataVersion is handled separately, so it is not included here. + * + * See {@link TestFeatureVersion} as an example. See {@link FeatureVersion} when implementing a new feature. + */ + TEST_VERSION("test.feature.version", TestFeatureVersion.values(), TestFeatureVersion.LATEST_PRODUCTION); -public final class Features { - private final MetadataVersion version; - private final Map<String, Short> finalizedFeatures; - private final long finalizedFeaturesEpoch; + public static final Features[] FEATURES; + public static final List<Features> PRODUCTION_FEATURES; - public static Features fromKRaftVersion(MetadataVersion version) { - return new Features(version, Collections.emptyMap(), -1, true); + public static final List<String> PRODUCTION_FEATURE_NAMES; + private final String name; + private final FeatureVersion[] featureVersions; + private final FeatureVersion latestProductionVersion; + + Features(String name, + FeatureVersion[] featureVersions, + FeatureVersion latestProductionVersion) { + this.name = name; + this.featureVersions = featureVersions; + this.latestProductionVersion = latestProductionVersion; } - public Features( - MetadataVersion version, - Map<String, Short> finalizedFeatures, - long finalizedFeaturesEpoch, - boolean kraftMode - ) { - this.version = version; - this.finalizedFeatures = new HashMap<>(finalizedFeatures); - this.finalizedFeaturesEpoch = finalizedFeaturesEpoch; - // In KRaft mode, we always include the metadata version in the features map. - // In ZK mode, we never include it. - if (kraftMode) { - this.finalizedFeatures.put(FEATURE_NAME, version.featureLevel()); - } else { - this.finalizedFeatures.remove(FEATURE_NAME); - } + static { + Features[] enumValues = Features.values(); + FEATURES = Arrays.copyOf(enumValues, enumValues.length); + + PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature -> + feature.latestProductionVersion != null).collect(Collectors.toList()); + PRODUCTION_FEATURE_NAMES = PRODUCTION_FEATURES.stream().map(feature -> + feature.name).collect(Collectors.toList()); } - public MetadataVersion metadataVersion() { - return version; + public String featureName() { + return name; } - public Map<String, Short> finalizedFeatures() { - return finalizedFeatures; + public FeatureVersion[] featureVersions() { + return featureVersions; } - public long finalizedFeaturesEpoch() { - return finalizedFeaturesEpoch; + /** + * Creates a FeatureVersion from a given name and level with the correct feature object underneath. Review Comment: No name is provided, right? ########## core/src/main/scala/kafka/tools/StorageTool.scala: ########## @@ -109,6 +111,52 @@ object StorageTool extends Logging { } } + private def validateMetadataVersion(metadataVersion: MetadataVersion, config: Option[KafkaConfig]): Unit = { + if (!metadataVersion.isKRaftSupported) { + throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") + } + if (!metadataVersion.isProduction) { + if (config.get.unstableMetadataVersionsEnabled) { + System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") + } else { + throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") + } + } + } + + private[tools] def generateFeatureRecords(metadataRecords: ArrayBuffer[ApiMessageAndVersion], + metadataVersion: MetadataVersion, + specifiedFeatures: Map[String, java.lang.Short], + allFeatures: List[Features], + usesVersionDefault: Boolean): Unit = { + // If we are using --version-default, the default is based on the metadata version. + val metadataVersionForDefault = if (usesVersionDefault) Optional.of(metadataVersion) else Optional.empty[MetadataVersion]() Review Comment: I am wondering why we need metadataVersionForDefault. If --release_version is not specified, metadataVersion defaults to latest production version. Do we get the same result by passing in that metadataVersion to `feature.defaultValue()`? ########## server-common/src/main/java/org/apache/kafka/server/common/Features.java: ########## @@ -16,72 +16,143 @@ */ package org.apache.kafka.server.common; -import java.util.Collections; -import java.util.HashMap; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; import java.util.Map; -import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; -import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME; +/** + * This is enum for the various features implemented for Kafka clusters. + * KIP-584: Versioning Scheme for Features introduced the idea of various features, but only added one feature -- MetadataVersion. + * KIP-1022: Formatting and Updating Features allowed for more features to be added. In order to set and update features, + * they need to be specified via the StorageTool or FeatureCommand tools. + * <br> + * Having a unified enum for the features that will use a shared type in the API used to set and update them + * makes it easier to process these features. + */ +public enum Features { + + /** + * Features defined. If a feature is included in this list, and marked to be used in production they will also be specified when + * formatting a cluster via the StorageTool. MetadataVersion is handled separately, so it is not included here. + * + * See {@link TestFeatureVersion} as an example. See {@link FeatureVersion} when implementing a new feature. + */ + TEST_VERSION("test.feature.version", TestFeatureVersion.values(), TestFeatureVersion.LATEST_PRODUCTION); -public final class Features { - private final MetadataVersion version; - private final Map<String, Short> finalizedFeatures; - private final long finalizedFeaturesEpoch; + public static final Features[] FEATURES; + public static final List<Features> PRODUCTION_FEATURES; - public static Features fromKRaftVersion(MetadataVersion version) { - return new Features(version, Collections.emptyMap(), -1, true); + public static final List<String> PRODUCTION_FEATURE_NAMES; + private final String name; + private final FeatureVersion[] featureVersions; + private final FeatureVersion latestProductionVersion; + + Features(String name, + FeatureVersion[] featureVersions, + FeatureVersion latestProductionVersion) { + this.name = name; + this.featureVersions = featureVersions; + this.latestProductionVersion = latestProductionVersion; } - public Features( - MetadataVersion version, - Map<String, Short> finalizedFeatures, - long finalizedFeaturesEpoch, - boolean kraftMode - ) { - this.version = version; - this.finalizedFeatures = new HashMap<>(finalizedFeatures); - this.finalizedFeaturesEpoch = finalizedFeaturesEpoch; - // In KRaft mode, we always include the metadata version in the features map. - // In ZK mode, we never include it. - if (kraftMode) { - this.finalizedFeatures.put(FEATURE_NAME, version.featureLevel()); - } else { - this.finalizedFeatures.remove(FEATURE_NAME); - } + static { + Features[] enumValues = Features.values(); + FEATURES = Arrays.copyOf(enumValues, enumValues.length); + + PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature -> + feature.latestProductionVersion != null).collect(Collectors.toList()); + PRODUCTION_FEATURE_NAMES = PRODUCTION_FEATURES.stream().map(feature -> + feature.name).collect(Collectors.toList()); } - public MetadataVersion metadataVersion() { - return version; + public String featureName() { + return name; } - public Map<String, Short> finalizedFeatures() { - return finalizedFeatures; + public FeatureVersion[] featureVersions() { + return featureVersions; } - public long finalizedFeaturesEpoch() { - return finalizedFeaturesEpoch; + /** + * Creates a FeatureVersion from a given name and level with the correct feature object underneath. + * + * @param level the level of the feature + * @returns the FeatureVersionUtils.FeatureVersion for the feature the enum is based on. Review Comment: @returns => @return -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
