Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan merged PR #15685: URL: https://github.com/apache/kafka/pull/15685 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on PR #15685: URL: https://github.com/apache/kafka/pull/15685#issuecomment-2138416302 Test failures are unrelated. Merging 🎉 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1619346921 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -60,24 +60,28 @@ object StorageTool extends Logging { case "format" => val directories = configToLogDirectories(config.get) val clusterId = namespace.getString("cluster_id") - val metadataVersion = getMetadataVersion(namespace, - Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) - if (!metadataVersion.isKRaftSupported) { -throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") - } - if (!metadataVersion.isProduction) { -if (config.get.unstableMetadataVersionsEnabled) { - System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") -} else { - throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") -} - } val metaProperties = new MetaProperties.Builder(). setVersion(MetaPropertiesVersion.V1). setClusterId(clusterId). setNodeId(config.get.nodeId). build() val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer() + val specifiedFeatures: util.List[String] = namespace.getList("feature") + if (namespace.getString("release_version") != null && specifiedFeatures != null) { +throw new TerseFailure("Both --release-version and --feature were set. Only one of the two flags can be set.") + } + val featureNamesAndLevelsMap = featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList) + val metadataVersion = getMetadataVersion(namespace, featureNamesAndLevelsMap, + Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) + validateMetadataVersion(metadataVersion, config) + // Get all other features, validate, and create records for them + generateFeatureRecords( +metadataRecords, +metadataVersion, +featureNamesAndLevelsMap, +Features.PRODUCTION_FEATURES.asScala.toList, +!Option(namespace.getString("release_version")).isEmpty Review Comment: ``` val defaultValue = defaultVersionString match { case Some(versionString) => MetadataVersion.fromVersionString(versionString) case None => MetadataVersion.LATEST_PRODUCTION } val releaseVersionTag = Option(namespace.getString("release_version")) val featureTag = featureNamesAndLevelsMap.get(MetadataVersion.FEATURE_NAME) (releaseVersionTag, featureTag) match { case (Some(_), Some(_)) => // We should throw an error before we hit this case, but include for completeness throw new IllegalArgumentException("Both --release_version and --feature were set. Only one of the two flags can be set.") case (Some(version), None) => MetadataVersion.fromVersionString(version) case (None, Some(level)) => MetadataVersion.fromFeatureLevel(level) case (None, None) => defaultValue } ``` ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -60,24 +60,28 @@ object StorageTool extends Logging { case "format" => val directories = configToLogDirectories(config.get) val clusterId = namespace.getString("cluster_id") - val metadataVersion = getMetadataVersion(namespace, - Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) - if (!metadataVersion.isKRaftSupported) { -throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") - } - if (!metadataVersion.isProduction) { -if (config.get.unstableMetadataVersionsEnabled) { - System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") -} else { - throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") -} - } val metaProperties = new MetaProperties.Builder(). setVersion(MetaPropertiesVersion.V1). setClusterId(clusterId). setNodeId(config.get.nodeId). build() val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer() + val specifiedFeatures: util.List[String] = namespace.getList("feature") + if (namespace.getString("release_version") != null && specifiedFeatures != nu
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1619344585 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -60,24 +60,28 @@ object StorageTool extends Logging { case "format" => val directories = configToLogDirectories(config.get) val clusterId = namespace.getString("cluster_id") - val metadataVersion = getMetadataVersion(namespace, - Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) - if (!metadataVersion.isKRaftSupported) { -throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") - } - if (!metadataVersion.isProduction) { -if (config.get.unstableMetadataVersionsEnabled) { - System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") -} else { - throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") -} - } val metaProperties = new MetaProperties.Builder(). setVersion(MetaPropertiesVersion.V1). setClusterId(clusterId). setNodeId(config.get.nodeId). build() val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer() + val specifiedFeatures: util.List[String] = namespace.getList("feature") + if (namespace.getString("release_version") != null && specifiedFeatures != null) { +throw new TerseFailure("Both --release-version and --feature were set. Only one of the two flags can be set.") + } + val featureNamesAndLevelsMap = featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList) + val metadataVersion = getMetadataVersion(namespace, featureNamesAndLevelsMap, + Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) + validateMetadataVersion(metadataVersion, config) + // Get all other features, validate, and create records for them + generateFeatureRecords( +metadataRecords, +metadataVersion, +featureNamesAndLevelsMap, +Features.PRODUCTION_FEATURES.asScala.toList, +!Option(namespace.getString("release_version")).isEmpty Review Comment: We only use the passed in metadata version for defaults if --release-version is specified. If version default is specified, we don't use the replication configs. ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -60,24 +60,28 @@ object StorageTool extends Logging { case "format" => val directories = configToLogDirectories(config.get) val clusterId = namespace.getString("cluster_id") - val metadataVersion = getMetadataVersion(namespace, - Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) - if (!metadataVersion.isKRaftSupported) { -throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") - } - if (!metadataVersion.isProduction) { -if (config.get.unstableMetadataVersionsEnabled) { - System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") -} else { - throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") -} - } val metaProperties = new MetaProperties.Builder(). setVersion(MetaPropertiesVersion.V1). setClusterId(clusterId). setNodeId(config.get.nodeId). build() val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer() + val specifiedFeatures: util.List[String] = namespace.getList("feature") + if (namespace.getString("release_version") != null && specifiedFeatures != null) { +throw new TerseFailure("Both --release-version and --feature were set. Only one of the two flags can be set.") + } + val featureNamesAndLevelsMap = featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList) + val metadataVersion = getMetadataVersion(namespace, featureNamesAndLevelsMap, + Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) + validateMetadataVersion(metadataVersion, config) + // Get all other features, validate, and create records for them + generateFeatureRecords( +metadataRecords, +metadataVersion, +featureNamesAndLevelsMap, +
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1619344585 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -60,24 +60,28 @@ object StorageTool extends Logging { case "format" => val directories = configToLogDirectories(config.get) val clusterId = namespace.getString("cluster_id") - val metadataVersion = getMetadataVersion(namespace, - Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) - if (!metadataVersion.isKRaftSupported) { -throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") - } - if (!metadataVersion.isProduction) { -if (config.get.unstableMetadataVersionsEnabled) { - System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") -} else { - throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") -} - } val metaProperties = new MetaProperties.Builder(). setVersion(MetaPropertiesVersion.V1). setClusterId(clusterId). setNodeId(config.get.nodeId). build() val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer() + val specifiedFeatures: util.List[String] = namespace.getList("feature") + if (namespace.getString("release_version") != null && specifiedFeatures != null) { +throw new TerseFailure("Both --release-version and --feature were set. Only one of the two flags can be set.") + } + val featureNamesAndLevelsMap = featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList) + val metadataVersion = getMetadataVersion(namespace, featureNamesAndLevelsMap, + Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) + validateMetadataVersion(metadataVersion, config) + // Get all other features, validate, and create records for them + generateFeatureRecords( +metadataRecords, +metadataVersion, +featureNamesAndLevelsMap, +Features.PRODUCTION_FEATURES.asScala.toList, +!Option(namespace.getString("release_version")).isEmpty Review Comment: We only use the passed in metadata version for defaults if --version-default is specified. If version default is specified, we don't use the replication configs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
junrao commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1619332926 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -60,24 +60,28 @@ object StorageTool extends Logging { case "format" => val directories = configToLogDirectories(config.get) val clusterId = namespace.getString("cluster_id") - val metadataVersion = getMetadataVersion(namespace, - Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) - if (!metadataVersion.isKRaftSupported) { -throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") - } - if (!metadataVersion.isProduction) { -if (config.get.unstableMetadataVersionsEnabled) { - System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") -} else { - throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") -} - } val metaProperties = new MetaProperties.Builder(). setVersion(MetaPropertiesVersion.V1). setClusterId(clusterId). setNodeId(config.get.nodeId). build() val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer() + val specifiedFeatures: util.List[String] = namespace.getList("feature") + if (namespace.getString("release_version") != null && specifiedFeatures != null) { +throw new TerseFailure("Both --release-version and --feature were set. Only one of the two flags can be set.") + } + val featureNamesAndLevelsMap = featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList) + val metadataVersion = getMetadataVersion(namespace, featureNamesAndLevelsMap, + Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) + validateMetadataVersion(metadataVersion, config) + // Get all other features, validate, and create records for them + generateFeatureRecords( +metadataRecords, +metadataVersion, +featureNamesAndLevelsMap, +Features.PRODUCTION_FEATURES.asScala.toList, +!Option(namespace.getString("release_version")).isEmpty Review Comment: Thanks for the reply. Got it now. We pass in INTER_BROKER_PROTOCOL_VERSION_CONFIG as the default when calling `getMetadataVersion`. But that config shouldn't impact the MV used for selecting other features. ``` val metadataVersion = getMetadataVersion(namespace, featureNamesAndLevelsMap, Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1619263353 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -60,24 +60,28 @@ object StorageTool extends Logging { case "format" => val directories = configToLogDirectories(config.get) val clusterId = namespace.getString("cluster_id") - val metadataVersion = getMetadataVersion(namespace, - Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) - if (!metadataVersion.isKRaftSupported) { -throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") - } - if (!metadataVersion.isProduction) { -if (config.get.unstableMetadataVersionsEnabled) { - System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") -} else { - throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") -} - } val metaProperties = new MetaProperties.Builder(). setVersion(MetaPropertiesVersion.V1). setClusterId(clusterId). setNodeId(config.get.nodeId). build() val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer() + val specifiedFeatures: util.List[String] = namespace.getList("feature") + if (namespace.getString("release_version") != null && specifiedFeatures != null) { +throw new TerseFailure("Both --release-version and --feature were set. Only one of the two flags can be set.") + } + val featureNamesAndLevelsMap = featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList) + val metadataVersion = getMetadataVersion(namespace, featureNamesAndLevelsMap, + Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) + validateMetadataVersion(metadataVersion, config) + // Get all other features, validate, and create records for them + generateFeatureRecords( +metadataRecords, +metadataVersion, +featureNamesAndLevelsMap, +Features.PRODUCTION_FEATURES.asScala.toList, +!Option(namespace.getString("release_version")).isEmpty Review Comment: I've updated this to make it clearer, but I think the original code is correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1619260318 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -60,24 +60,28 @@ object StorageTool extends Logging { case "format" => val directories = configToLogDirectories(config.get) val clusterId = namespace.getString("cluster_id") - val metadataVersion = getMetadataVersion(namespace, - Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) - if (!metadataVersion.isKRaftSupported) { -throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") - } - if (!metadataVersion.isProduction) { -if (config.get.unstableMetadataVersionsEnabled) { - System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") -} else { - throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") -} - } val metaProperties = new MetaProperties.Builder(). setVersion(MetaPropertiesVersion.V1). setClusterId(clusterId). setNodeId(config.get.nodeId). build() val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer() + val specifiedFeatures: util.List[String] = namespace.getList("feature") + if (namespace.getString("release_version") != null && specifiedFeatures != null) { +throw new TerseFailure("Both --release-version and --feature were set. Only one of the two flags can be set.") + } + val featureNamesAndLevelsMap = featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList) + val metadataVersion = getMetadataVersion(namespace, featureNamesAndLevelsMap, + Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) + validateMetadataVersion(metadataVersion, config) + // Get all other features, validate, and create records for them + generateFeatureRecords( +metadataRecords, +metadataVersion, +featureNamesAndLevelsMap, +Features.PRODUCTION_FEATURES.asScala.toList, +!Option(namespace.getString("release_version")).isEmpty Review Comment: Sorry I thought about this more. I don't think this is correct. If we don't specify --release-version we will use latest production. Where do you see we use the replication configs? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1619249547 ## server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java: ## @@ -14,37 +14,101 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.server.common; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; -import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME; -import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_KRAFT_VERSION; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; -class FeaturesTest { -@Test -public void testKRaftModeFeatures() { -Features features = new Features(MINIMUM_KRAFT_VERSION, -Collections.singletonMap("foo", (short) 2), 123, true); -assertEquals(MINIMUM_KRAFT_VERSION.featureLevel(), -features.finalizedFeatures().get(FEATURE_NAME)); -assertEquals((short) 2, -features.finalizedFeatures().get("foo")); -assertEquals(2, features.finalizedFeatures().size()); +public class FeaturesTest { + +@ParameterizedTest +@EnumSource(Features.class) +public void testFromFeatureLevelAllFeatures(Features feature) { +FeatureVersion[] featureImplementations = feature.featureVersions(); +int numFeatures = featureImplementations.length; +for (short i = 1; i < numFeatures; i++) { +assertEquals(featureImplementations[i - 1], feature.fromFeatureLevel(i)); +} +} + +@ParameterizedTest +@EnumSource(Features.class) +public void testValidateVersionAllFeatures(Features feature) { +for (FeatureVersion featureImpl : feature.featureVersions()) { +// Ensure the minimum bootstrap metadata version is included if no metadata version dependency. +Map deps = new HashMap<>(); +deps.putAll(featureImpl.dependencies()); +if (!deps.containsKey(MetadataVersion.FEATURE_NAME)) { Review Comment: I added this to the test so it has reasonable features passed in. But when this is called, we are passing in ALL features including metadata version. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1619248609 ## server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java: ## @@ -14,37 +14,101 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.server.common; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; -import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME; -import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_KRAFT_VERSION; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; -class FeaturesTest { -@Test -public void testKRaftModeFeatures() { -Features features = new Features(MINIMUM_KRAFT_VERSION, -Collections.singletonMap("foo", (short) 2), 123, true); -assertEquals(MINIMUM_KRAFT_VERSION.featureLevel(), -features.finalizedFeatures().get(FEATURE_NAME)); -assertEquals((short) 2, -features.finalizedFeatures().get("foo")); -assertEquals(2, features.finalizedFeatures().size()); +public class FeaturesTest { + +@ParameterizedTest +@EnumSource(Features.class) +public void testFromFeatureLevelAllFeatures(Features feature) { +FeatureVersion[] featureImplementations = feature.featureVersions(); +int numFeatures = featureImplementations.length; +for (short i = 1; i < numFeatures; i++) { +assertEquals(featureImplementations[i - 1], feature.fromFeatureLevel(i)); +} +} + +@ParameterizedTest +@EnumSource(Features.class) +public void testValidateVersionAllFeatures(Features feature) { +for (FeatureVersion featureImpl : feature.featureVersions()) { +// Ensure the minimum bootstrap metadata version is included if no metadata version dependency. +Map deps = new HashMap<>(); +deps.putAll(featureImpl.dependencies()); +if (!deps.containsKey(MetadataVersion.FEATURE_NAME)) { Review Comment: No. Features should not require a dependency on MV. I don't think we need to add this logic to all places validation is called. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1619246972 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -109,6 +114,52 @@ object StorageTool extends Logging { } } + private def validateMetadataVersion(metadataVersion: MetadataVersion, config: Option[KafkaConfig]): Unit = { +if (!metadataVersion.isKRaftSupported) { + throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") +} +if (!metadataVersion.isProduction) { + if (config.get.unstableMetadataVersionsEnabled) { +System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") + } else { +throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") + } +} + } + + private[tools] def generateFeatureRecords(metadataRecords: ArrayBuffer[ApiMessageAndVersion], +metadataVersion: MetadataVersion, +specifiedFeatures: Map[String, java.lang.Short], +allFeatures: List[Features], +usesVersionDefault: Boolean): Unit = { Review Comment: I have to change this because of the other comment you mentioned anyway 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1619246506 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -60,24 +60,28 @@ object StorageTool extends Logging { case "format" => val directories = configToLogDirectories(config.get) val clusterId = namespace.getString("cluster_id") - val metadataVersion = getMetadataVersion(namespace, - Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) - if (!metadataVersion.isKRaftSupported) { -throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") - } - if (!metadataVersion.isProduction) { -if (config.get.unstableMetadataVersionsEnabled) { - System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") -} else { - throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") -} - } val metaProperties = new MetaProperties.Builder(). setVersion(MetaPropertiesVersion.V1). setClusterId(clusterId). setNodeId(config.get.nodeId). build() val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer() + val specifiedFeatures: util.List[String] = namespace.getList("feature") + if (namespace.getString("release_version") != null && specifiedFeatures != null) { +throw new TerseFailure("Both --release-version and --feature were set. Only one of the two flags can be set.") + } + val featureNamesAndLevelsMap = featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList) + val metadataVersion = getMetadataVersion(namespace, featureNamesAndLevelsMap, + Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) + validateMetadataVersion(metadataVersion, config) + // Get all other features, validate, and create records for them + generateFeatureRecords( +metadataRecords, +metadataVersion, +featureNamesAndLevelsMap, +Features.PRODUCTION_FEATURES.asScala.toList, +!Option(namespace.getString("release_version")).isEmpty Review Comment: I wouldn't particularly split hairs here since I think both are reasonable. However, I can make it consistent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
junrao commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1619224649 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -109,6 +114,52 @@ object StorageTool extends Logging { } } + private def validateMetadataVersion(metadataVersion: MetadataVersion, config: Option[KafkaConfig]): Unit = { +if (!metadataVersion.isKRaftSupported) { + throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") +} +if (!metadataVersion.isProduction) { + if (config.get.unstableMetadataVersionsEnabled) { +System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") + } else { +throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") + } +} + } + + private[tools] def generateFeatureRecords(metadataRecords: ArrayBuffer[ApiMessageAndVersion], +metadataVersion: MetadataVersion, +specifiedFeatures: Map[String, java.lang.Short], +allFeatures: List[Features], +usesVersionDefault: Boolean): Unit = { Review Comment: usesVersionDefault => releaseVersionSpecified ? ## server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java: ## @@ -14,37 +14,101 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.server.common; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; -import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME; -import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_KRAFT_VERSION; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; -class FeaturesTest { -@Test -public void testKRaftModeFeatures() { -Features features = new Features(MINIMUM_KRAFT_VERSION, -Collections.singletonMap("foo", (short) 2), 123, true); -assertEquals(MINIMUM_KRAFT_VERSION.featureLevel(), -features.finalizedFeatures().get(FEATURE_NAME)); -assertEquals((short) 2, -features.finalizedFeatures().get("foo")); -assertEquals(2, features.finalizedFeatures().size()); +public class FeaturesTest { + +@ParameterizedTest +@EnumSource(Features.class) +public void testFromFeatureLevelAllFeatures(Features feature) { +FeatureVersion[] featureImplementations = feature.featureVersions(); +int numFeatures = featureImplementations.length; +for (short i = 1; i < numFeatures; i++) { +assertEquals(featureImplementations[i - 1], feature.fromFeatureLevel(i)); +} +} + +@ParameterizedTest +@EnumSource(Features.class) +public void testValidateVersionAllFeatures(Features feature) { +for (FeatureVersion featureImpl : feature.featureVersions()) { +// Ensure the minimum bootstrap metadata version is included if no metadata version dependency. +Map deps = new HashMap<>(); +deps.putAll(featureImpl.dependencies()); +if (!deps.containsKey(MetadataVersion.FEATURE_NAME)) { Review Comment: Should we require each feature to include a dependency on MV? Otherwise, we need to add this logic in all places where `Features.validateVersion` is called. ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -60,24 +60,28 @@ object StorageTool extends Logging { case "format" => val directories = configToLogDirectories(config.get) val clusterId = namespace.getString("cluster_id") - val metadataVersion = getMetadataVersion(namespace, - Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) - if (!metadataVersion.isKRaftSupported) { -throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") - } - if (!metadataVersion.isProduction) { -if (config.get.unstableMetadataVersionsEnabled) { - System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") -} else { - throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") -} - } val metaProperties = new MetaProperties.Builder(). setVersion(MetaPropertiesVersion.
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1619106457 ## server-common/src/main/java/org/apache/kafka/server/common/Features.java: ## @@ -16,72 +16,135 @@ */ package org.apache.kafka.server.common; -import java.util.Collections; -import java.util.HashMap; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; import java.util.Map; -import java.util.Objects; +import java.util.stream.Collectors; -import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME; +/** + * This is enum for the various features implemented for Kafka clusters. + * KIP-584: Versioning Scheme for Features introduced the idea of various features, but only added one feature -- MetadataVersion. + * KIP-1022: Formatting and Updating Features allowed for more features to be added. In order to set and update features, + * they need to be specified via the StorageTool or FeatureCommand tools. + * + * Having a unified enum for the features that will use a shared type in the API used to set and update them + * makes it easier to process these features. + */ +public enum Features { + +/** + * Features defined. If a feature is included in this list, and marked to be used in production they will also be specified when + * formatting a cluster via the StorageTool. MetadataVersion is handled separately, so it is not included here. + * + * See {@link TestFeatureVersion} as an example. See {@link FeatureVersion} when implementing a new feature. + */ +TEST_VERSION("test.feature.version", TestFeatureVersion.values()); + +public static final Features[] FEATURES; +public static final List PRODUCTION_FEATURES; -public final class Features { -private final MetadataVersion version; -private final Map finalizedFeatures; -private final long finalizedFeaturesEpoch; +public static final List PRODUCTION_FEATURE_NAMES; +private final String name; +private final FeatureVersion[] featureVersions; -public static Features fromKRaftVersion(MetadataVersion version) { -return new Features(version, Collections.emptyMap(), -1, true); +Features(String name, + FeatureVersion[] featureVersions) { +this.name = name; +this.featureVersions = featureVersions; } -public Features( -MetadataVersion version, -Map finalizedFeatures, -long finalizedFeaturesEpoch, -boolean kraftMode -) { -this.version = version; -this.finalizedFeatures = new HashMap<>(finalizedFeatures); -this.finalizedFeaturesEpoch = finalizedFeaturesEpoch; -// In KRaft mode, we always include the metadata version in the features map. -// In ZK mode, we never include it. -if (kraftMode) { -this.finalizedFeatures.put(FEATURE_NAME, version.featureLevel()); -} else { -this.finalizedFeatures.remove(FEATURE_NAME); -} +static { +Features[] enumValues = Features.values(); +FEATURES = Arrays.copyOf(enumValues, enumValues.length); + +PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature -> +feature.name != TEST_VERSION.featureName()).collect(Collectors.toList()); Review Comment: fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
dajac commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1618450649 ## server-common/src/main/java/org/apache/kafka/server/common/Features.java: ## @@ -16,72 +16,135 @@ */ package org.apache.kafka.server.common; -import java.util.Collections; -import java.util.HashMap; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; import java.util.Map; -import java.util.Objects; +import java.util.stream.Collectors; -import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME; +/** + * This is enum for the various features implemented for Kafka clusters. + * KIP-584: Versioning Scheme for Features introduced the idea of various features, but only added one feature -- MetadataVersion. + * KIP-1022: Formatting and Updating Features allowed for more features to be added. In order to set and update features, + * they need to be specified via the StorageTool or FeatureCommand tools. + * + * Having a unified enum for the features that will use a shared type in the API used to set and update them + * makes it easier to process these features. + */ +public enum Features { + +/** + * Features defined. If a feature is included in this list, and marked to be used in production they will also be specified when + * formatting a cluster via the StorageTool. MetadataVersion is handled separately, so it is not included here. + * + * See {@link TestFeatureVersion} as an example. See {@link FeatureVersion} when implementing a new feature. + */ +TEST_VERSION("test.feature.version", TestFeatureVersion.values()); + +public static final Features[] FEATURES; +public static final List PRODUCTION_FEATURES; -public final class Features { -private final MetadataVersion version; -private final Map finalizedFeatures; -private final long finalizedFeaturesEpoch; +public static final List PRODUCTION_FEATURE_NAMES; +private final String name; +private final FeatureVersion[] featureVersions; -public static Features fromKRaftVersion(MetadataVersion version) { -return new Features(version, Collections.emptyMap(), -1, true); +Features(String name, + FeatureVersion[] featureVersions) { +this.name = name; +this.featureVersions = featureVersions; } -public Features( -MetadataVersion version, -Map finalizedFeatures, -long finalizedFeaturesEpoch, -boolean kraftMode -) { -this.version = version; -this.finalizedFeatures = new HashMap<>(finalizedFeatures); -this.finalizedFeaturesEpoch = finalizedFeaturesEpoch; -// In KRaft mode, we always include the metadata version in the features map. -// In ZK mode, we never include it. -if (kraftMode) { -this.finalizedFeatures.put(FEATURE_NAME, version.featureLevel()); -} else { -this.finalizedFeatures.remove(FEATURE_NAME); -} +static { +Features[] enumValues = Features.values(); +FEATURES = Arrays.copyOf(enumValues, enumValues.length); + +PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature -> +feature.name != TEST_VERSION.featureName()).collect(Collectors.toList()); Review Comment: There is a small bug here. We should use equals, I think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1618018029 ## server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java: ## @@ -14,37 +14,86 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.server.common; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import java.util.Collections; -import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME; -import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_KRAFT_VERSION; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; -class FeaturesTest { -@Test -public void testKRaftModeFeatures() { -Features features = new Features(MINIMUM_KRAFT_VERSION, -Collections.singletonMap("foo", (short) 2), 123, true); -assertEquals(MINIMUM_KRAFT_VERSION.featureLevel(), -features.finalizedFeatures().get(FEATURE_NAME)); -assertEquals((short) 2, -features.finalizedFeatures().get("foo")); -assertEquals(2, features.finalizedFeatures().size()); +public class FeaturesTest { + +@ParameterizedTest +@EnumSource(Features.class) +public void testFromFeatureLevelAllFeatures(Features feature) { +FeatureVersion[] featureImplementations = feature.featureVersions(); +int numFeatures = featureImplementations.length; +for (short i = 1; i < numFeatures; i++) { +assertEquals(featureImplementations[i - 1], feature.fromFeatureLevel(i)); +} +} + +@ParameterizedTest +@EnumSource(Features.class) +public void testValidateVersionAllFeatures(Features feature) { +for (FeatureVersion featureImpl : feature.featureVersions()) { +// Ensure that the feature is valid given the typical metadataVersionMapping and the dependencies. +// Note: Other metadata versions are valid, but this one should always be valid. +Features.validateVersion(featureImpl, featureImpl.bootstrapMetadataVersion(), featureImpl.dependencies()); +} } @Test -public void testZkModeFeatures() { -Features features = new Features(MINIMUM_KRAFT_VERSION, -Collections.singletonMap("foo", (short) 2), 123, false); -assertNull(features.finalizedFeatures().get(FEATURE_NAME)); -assertEquals((short) 2, -features.finalizedFeatures().get("foo")); -assertEquals(1, features.finalizedFeatures().size()); +public void testInvalidValidateVersion() { +// Using too low of a MetadataVersion is invalid +assertThrows(IllegalArgumentException.class, +() -> Features.validateVersion( +TestFeatureVersion.TEST_1, +MetadataVersion.IBP_2_8_IV0, +Collections.emptyMap() +) +); + +// Using a version that is lower than the dependency will fail. +assertThrows(IllegalArgumentException.class, + () -> Features.validateVersion( + TestFeatureVersion.TEST_2, + MetadataVersion.MINIMUM_BOOTSTRAP_VERSION, Review Comment: I thought it would be easier, but I see there is room for mistakes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1618017128 ## server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java: ## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME; + +public final class FinalizedFeatures { +private final MetadataVersion metadataVersion; +private final Map finalizedFeatures; +private final long finalizedFeaturesEpoch; + +public static FinalizedFeatures fromKRaftVersion(MetadataVersion version) { +return new FinalizedFeatures(version, Collections.emptyMap(), -1, true); +} + +public FinalizedFeatures( +MetadataVersion metadataVersion, +Map finalizedFeatures, +long finalizedFeaturesEpoch, +boolean kraftMode +) { +this.metadataVersion = metadataVersion; +this.finalizedFeatures = new HashMap<>(finalizedFeatures); +this.finalizedFeaturesEpoch = finalizedFeaturesEpoch; +// In KRaft mode, we always include the metadata version in the features map. +// In ZK mode, we never include it. +if (kraftMode) { +this.finalizedFeatures.put(FEATURE_NAME, metadataVersion.featureLevel()); Review Comment: sure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1618017282 ## metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java: ## @@ -61,6 +62,12 @@ public static Map defaultFeatureMap(boolean enableUnstable enableUnstable ? MetadataVersion.latestTesting().featureLevel() : MetadataVersion.latestProduction().featureLevel())); +for (Features feature : Features.PRODUCTION_FEATURES) { +features.put(feature.featureName(), VersionRange.of( +0, +feature.latestProduction() Review Comment: This is in the followup. :) It changes a ton of files so I would prefer to do it separately. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1618016824 ## server-common/src/main/java/org/apache/kafka/server/common/Features.java: ## @@ -16,72 +16,134 @@ */ package org.apache.kafka.server.common; -import java.util.Collections; -import java.util.HashMap; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; import java.util.Map; -import java.util.Objects; +import java.util.stream.Collectors; -import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME; +/** + * This is enum for the various features implemented for Kafka clusters. + * KIP-584: Versioning Scheme for Features introduced the idea of various features, but only added one feature -- MetadataVersion. + * KIP-1022: Formatting and Updating Features allowed for more features to be added. In order to set and update features, + * they need to be specified via the StorageTool or FeatureCommand tools. + * + * Having a unified enum for the features that will use a shared type in the API used to set and update them + * makes it easier to process these features. + */ +public enum Features { + +/** + * Features defined. If a feature is included in this list, and marked to be used in production they will also be specified when + * formatting a cluster via the StorageTool. MetadataVersion is handled separately, so it is not included here. + * + * See {@link TestFeatureVersion} as an example. See {@link FeatureVersion} when implementing a new feature. + */ +TEST_VERSION("test.feature.version", TestFeatureVersion.values()); + +public static final Features[] FEATURES; +public static final List PRODUCTION_FEATURES; -public final class Features { -private final MetadataVersion version; -private final Map finalizedFeatures; -private final long finalizedFeaturesEpoch; +public static final List PRODUCTION_FEATURE_NAMES; +private final String name; +private final FeatureVersion[] featureVersions; -public static Features fromKRaftVersion(MetadataVersion version) { -return new Features(version, Collections.emptyMap(), -1, true); +Features(String name, + FeatureVersion[] featureVersions) { +this.name = name; +this.featureVersions = featureVersions; } -public Features( -MetadataVersion version, -Map finalizedFeatures, -long finalizedFeaturesEpoch, -boolean kraftMode -) { -this.version = version; -this.finalizedFeatures = new HashMap<>(finalizedFeatures); -this.finalizedFeaturesEpoch = finalizedFeaturesEpoch; -// In KRaft mode, we always include the metadata version in the features map. -// In ZK mode, we never include it. -if (kraftMode) { -this.finalizedFeatures.put(FEATURE_NAME, version.featureLevel()); -} else { -this.finalizedFeatures.remove(FEATURE_NAME); -} +static { +Features[] enumValues = Features.values(); +FEATURES = Arrays.copyOf(enumValues, enumValues.length); + +PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature -> +feature.name != TEST_VERSION.featureName()).collect(Collectors.toList()); +PRODUCTION_FEATURE_NAMES = PRODUCTION_FEATURES.stream().map(feature -> +feature.name).collect(Collectors.toList()); } -public MetadataVersion metadataVersion() { -return version; +public String featureName() { +return name; } -public Map finalizedFeatures() { -return finalizedFeatures; +public FeatureVersion[] featureVersions() { +return featureVersions; } -public long finalizedFeaturesEpoch() { -return finalizedFeaturesEpoch; +public short latestProduction() { +return defaultValue(MetadataVersion.LATEST_PRODUCTION); } -@Override -public boolean equals(Object o) { -if (o == null || !(o.getClass().equals(Features.class))) return false; -Features other = (Features) o; -return version == other.version && -finalizedFeatures.equals(other.finalizedFeatures) && -finalizedFeaturesEpoch == other.finalizedFeaturesEpoch; +/** + * Creates a FeatureVersion from a level. + * + * @param level the level of the feature + * @return the FeatureVersionUtils.FeatureVersion for the feature the enum is based on. + * @throwsIllegalArgumentException if the feature is not known. + */ +public FeatureVersion fromFeatureLevel(short level) { +return Arrays.stream(featureVersions).filter(featureVersion -> +featureVersion.featureLevel() == level).findFirst().orElseThrow( +() -> new IllegalArgumentException("No feature:" + featureName() + " with feature level " + level)); } -@Override -public int hashCode
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1618016483 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -60,24 +60,28 @@ object StorageTool extends Logging { case "format" => val directories = configToLogDirectories(config.get) val clusterId = namespace.getString("cluster_id") - val metadataVersion = getMetadataVersion(namespace, - Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) - if (!metadataVersion.isKRaftSupported) { -throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") - } - if (!metadataVersion.isProduction) { -if (config.get.unstableMetadataVersionsEnabled) { - System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") -} else { - throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") -} - } val metaProperties = new MetaProperties.Builder(). setVersion(MetaPropertiesVersion.V1). setClusterId(clusterId). setNodeId(config.get.nodeId). build() val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer() + val specifiedFeatures: util.List[String] = namespace.getList("feature") + if (namespace.getString("release_version") != null && specifiedFeatures != null) { +throw new TerseFailure("Both --release-version and --feature were set. Only one of the two flags can be set.") + } + val featureNamesAndLevelsMap = featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList) + val metadataVersion = getMetadataVersion(namespace, featureNamesAndLevelsMap, + Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) + validateMetadataVersion(metadataVersion, config) + // Get all other features, validate, and create records for them + generateFeatureRecords( +metadataRecords, +metadataVersion, +featureNamesAndLevelsMap, +Features.PRODUCTION_FEATURES.asScala.toList, +!Option(namespace.getString("release_version")).isEmpty Review Comment: If we use feature flags to specify MV, we should not default based on the MV but instead use latest default for non-specified features. The difference is specifying MV using `--version-default` vs `feature` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
junrao commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1617946240 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -60,24 +60,28 @@ object StorageTool extends Logging { case "format" => val directories = configToLogDirectories(config.get) val clusterId = namespace.getString("cluster_id") - val metadataVersion = getMetadataVersion(namespace, - Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) - if (!metadataVersion.isKRaftSupported) { -throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") - } - if (!metadataVersion.isProduction) { -if (config.get.unstableMetadataVersionsEnabled) { - System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") -} else { - throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") -} - } val metaProperties = new MetaProperties.Builder(). setVersion(MetaPropertiesVersion.V1). setClusterId(clusterId). setNodeId(config.get.nodeId). build() val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer() + val specifiedFeatures: util.List[String] = namespace.getList("feature") + if (namespace.getString("release_version") != null && specifiedFeatures != null) { +throw new TerseFailure("Both --release-version and --feature were set. Only one of the two flags can be set.") + } + val featureNamesAndLevelsMap = featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList) + val metadataVersion = getMetadataVersion(namespace, featureNamesAndLevelsMap, + Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) + validateMetadataVersion(metadataVersion, config) + // Get all other features, validate, and create records for them + generateFeatureRecords( +metadataRecords, +metadataVersion, +featureNamesAndLevelsMap, +Features.PRODUCTION_FEATURES.asScala.toList, +!Option(namespace.getString("release_version")).isEmpty Review Comment: I am wondering why we need to pass in `usesVersionDefault`? Earlier in `getMetadataVersion`, we already resolve `metadataVersion` to `LATEST_PRODUCTION` if it's not explicitly specified. ## server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java: ## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME; + +public final class FinalizedFeatures { +private final MetadataVersion metadataVersion; +private final Map finalizedFeatures; +private final long finalizedFeaturesEpoch; + +public static FinalizedFeatures fromKRaftVersion(MetadataVersion version) { +return new FinalizedFeatures(version, Collections.emptyMap(), -1, true); +} + +public FinalizedFeatures( +MetadataVersion metadataVersion, +Map finalizedFeatures, +long finalizedFeaturesEpoch, +boolean kraftMode +) { +this.metadataVersion = metadataVersion; +this.finalizedFeatures = new HashMap<>(finalizedFeatures); +this.finalizedFeaturesEpoch = finalizedFeaturesEpoch; +// In KRaft mode, we always include the metadata version in the features map. +// In ZK mode, we never include it. +if (kraftMode) { +this.finalizedFeatures.put(FEATURE_NAME, metadataVersion.featureLevel()); +} else { +this.finalizedFeatures.remove(FEATURE_NAME); +} +} + +public MetadataVersion metadataVers
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on PR #15685: URL: https://github.com/apache/kafka/pull/15685#issuecomment-2135851154 Yes @dajac! The code for unstable versions is mostly ready but I need to refactor based on some changes here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1613992588 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -60,24 +60,28 @@ object StorageTool extends Logging { case "format" => val directories = configToLogDirectories(config.get) val clusterId = namespace.getString("cluster_id") - val metadataVersion = getMetadataVersion(namespace, - Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) - if (!metadataVersion.isKRaftSupported) { -throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") - } - if (!metadataVersion.isProduction) { -if (config.get.unstableMetadataVersionsEnabled) { - System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") -} else { - throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") -} - } val metaProperties = new MetaProperties.Builder(). setVersion(MetaPropertiesVersion.V1). setClusterId(clusterId). setNodeId(config.get.nodeId). build() val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer() + val specifiedFeatures: util.List[String] = namespace.getList("feature") + if (namespace.getString("release_version") != null && specifiedFeatures != null) { +throw new TerseFailure("Both --release_version and --feature were set. Only one of the two flags can be set.") + } + val featureNamesAndLevelsMap = featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList) + val metadataVersion = getMetadataVersion(namespace, featureNamesAndLevelsMap, + Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) + validateMetadataVersion(metadataVersion, config) + // Get all other features, validate, and create records for them + generateFeatureRecords( +metadataRecords, +metadataVersion, +featureNamesAndLevelsMap, +Features.PRODUCTION_FEATURES.asScala.toList, +Option(namespace.getString("release_version")).isEmpty Review Comment: I should probably add a test for this 🤦♀️ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
artemlivshits commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1613922117 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -60,24 +60,28 @@ object StorageTool extends Logging { case "format" => val directories = configToLogDirectories(config.get) val clusterId = namespace.getString("cluster_id") - val metadataVersion = getMetadataVersion(namespace, - Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) - if (!metadataVersion.isKRaftSupported) { -throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") - } - if (!metadataVersion.isProduction) { -if (config.get.unstableMetadataVersionsEnabled) { - System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") -} else { - throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") -} - } val metaProperties = new MetaProperties.Builder(). setVersion(MetaPropertiesVersion.V1). setClusterId(clusterId). setNodeId(config.get.nodeId). build() val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer() + val specifiedFeatures: util.List[String] = namespace.getList("feature") + if (namespace.getString("release_version") != null && specifiedFeatures != null) { +throw new TerseFailure("Both --release_version and --feature were set. Only one of the two flags can be set.") + } + val featureNamesAndLevelsMap = featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList) + val metadataVersion = getMetadataVersion(namespace, featureNamesAndLevelsMap, + Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) + validateMetadataVersion(metadataVersion, config) + // Get all other features, validate, and create records for them + generateFeatureRecords( +metadataRecords, +metadataVersion, +featureNamesAndLevelsMap, +Features.PRODUCTION_FEATURES.asScala.toList, +Option(namespace.getString("release_version")).isEmpty Review Comment: Should it be `!isEmpty`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1613761681 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -109,6 +111,52 @@ object StorageTool extends Logging { } } + private def validateMetadataVersion(metadataVersion: MetadataVersion, config: Option[KafkaConfig]): Unit = { +if (!metadataVersion.isKRaftSupported) { + throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") +} +if (!metadataVersion.isProduction) { + if (config.get.unstableMetadataVersionsEnabled) { +System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") + } else { +throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") + } +} + } + + private[tools] def generateFeatureRecords(metadataRecords: ArrayBuffer[ApiMessageAndVersion], +metadataVersion: MetadataVersion, +specifiedFeatures: Map[String, java.lang.Short], +allFeatures: List[Features], +usesVersionDefault: Boolean): Unit = { +// If we are using --version-default, the default is based on the metadata version. +val metadataVersionForDefault = if (usesVersionDefault) Optional.of(metadataVersion) else Optional.empty[MetadataVersion]() Review Comment: Yup. I will push the code soon, but tried to write a clear comment about this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
junrao commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1613728690 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -109,6 +111,52 @@ object StorageTool extends Logging { } } + private def validateMetadataVersion(metadataVersion: MetadataVersion, config: Option[KafkaConfig]): Unit = { +if (!metadataVersion.isKRaftSupported) { + throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") +} +if (!metadataVersion.isProduction) { + if (config.get.unstableMetadataVersionsEnabled) { +System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") + } else { +throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") + } +} + } + + private[tools] def generateFeatureRecords(metadataRecords: ArrayBuffer[ApiMessageAndVersion], +metadataVersion: MetadataVersion, +specifiedFeatures: Map[String, java.lang.Short], +allFeatures: List[Features], +usesVersionDefault: Boolean): Unit = { +// If we are using --version-default, the default is based on the metadata version. +val metadataVersionForDefault = if (usesVersionDefault) Optional.of(metadataVersion) else Optional.empty[MetadataVersion]() Review Comment: > Originally this did not need to be a production ready MV even if the feature is production, but I think we are now flipping this around and saying the feature is production ready iff the MV is production ready. If there is a 1-to-1 mapping from feature to MV, reasoning about production readiness in one place seems simpler? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1613656338 ## server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java: ## @@ -21,16 +21,17 @@ public enum TestFeatureVersion implements FeatureVersion { -TEST_0(0, MetadataVersion.IBP_3_3_IV0, Collections.emptyMap()), +// TEST_1 released right before MV 3.7-IVO was released, and it has no dependencies TEST_1(1, MetadataVersion.IBP_3_7_IV0, Collections.emptyMap()), +// TEST_2 released right before MV 3.8-IVO was released, and it depends on this metadata version TEST_2(2, MetadataVersion.IBP_3_8_IV0, Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_8_IV0.featureLevel())); -private short featureLevel; -private MetadataVersion metadataVersionMapping; -private Map dependencies; +private final short featureLevel; +private final MetadataVersion metadataVersionMapping; +private final Map dependencies; public static final String FEATURE_NAME = "test.feature.version"; -public static final TestFeatureVersion PRODUCTION_VERSION = TEST_1; +public static final TestFeatureVersion LATEST_PRODUCTION = TEST_1; Review Comment: 🤦♀️ I realized I fixed this in the next PR. I will fix this here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1613639487 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -109,6 +111,52 @@ object StorageTool extends Logging { } } + private def validateMetadataVersion(metadataVersion: MetadataVersion, config: Option[KafkaConfig]): Unit = { +if (!metadataVersion.isKRaftSupported) { + throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") +} +if (!metadataVersion.isProduction) { + if (config.get.unstableMetadataVersionsEnabled) { +System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") + } else { +throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") + } +} + } + + private[tools] def generateFeatureRecords(metadataRecords: ArrayBuffer[ApiMessageAndVersion], +metadataVersion: MetadataVersion, +specifiedFeatures: Map[String, java.lang.Short], +allFeatures: List[Features], +usesVersionDefault: Boolean): Unit = { +// If we are using --version-default, the default is based on the metadata version. +val metadataVersionForDefault = if (usesVersionDefault) Optional.of(metadataVersion) else Optional.empty[MetadataVersion]() Review Comment: Ok -- so if I understand correctly, the request is to remove latest production per feature and to simply mark as production ready if the MV that corresponds to it is production ready? The only case where this is tricky is when we use the --feature flag and we need to find the latest production version. To do that, we will need to get the latest production Metadata and map that to a feature. It's doable though, so I can proceed with that. As for >If we follow the protocol of creating a new MV for each new feature and making them production ready at the same time then the answer to your question is yes. Originally this did not need to be a production ready MV even if the feature is production, but I think we are now flipping this around and saying the feature if production ready iff the MV is production ready. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1613639487 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -109,6 +111,52 @@ object StorageTool extends Logging { } } + private def validateMetadataVersion(metadataVersion: MetadataVersion, config: Option[KafkaConfig]): Unit = { +if (!metadataVersion.isKRaftSupported) { + throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") +} +if (!metadataVersion.isProduction) { + if (config.get.unstableMetadataVersionsEnabled) { +System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") + } else { +throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") + } +} + } + + private[tools] def generateFeatureRecords(metadataRecords: ArrayBuffer[ApiMessageAndVersion], +metadataVersion: MetadataVersion, +specifiedFeatures: Map[String, java.lang.Short], +allFeatures: List[Features], +usesVersionDefault: Boolean): Unit = { +// If we are using --version-default, the default is based on the metadata version. +val metadataVersionForDefault = if (usesVersionDefault) Optional.of(metadataVersion) else Optional.empty[MetadataVersion]() Review Comment: Ok -- so if I understand correctly, the request is to remove latest production per feature and to simply mark as production ready if the MV that corresponds to it is production ready? The only case where this is tricky is when we use the --feature flag and we need to find the latest production version. To do that, we will need to get the latest production Metadata and map that to a feature. It's doable though, so I can proceed with that. As for >If we follow the protocol of creating a new MV for each new feature and making them production ready at the same time then the answer to your question is yes. Originally this did not need to be a production ready MV even if the feature is production, but I think we are now flipping this around and saying the feature is production ready iff the MV is production ready. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1613634445 ## server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java: ## @@ -21,16 +21,17 @@ public enum TestFeatureVersion implements FeatureVersion { -TEST_0(0, MetadataVersion.IBP_3_3_IV0, Collections.emptyMap()), +// TEST_1 released right before MV 3.7-IVO was released, and it has no dependencies TEST_1(1, MetadataVersion.IBP_3_7_IV0, Collections.emptyMap()), +// TEST_2 released right before MV 3.8-IVO was released, and it depends on this metadata version TEST_2(2, MetadataVersion.IBP_3_8_IV0, Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_8_IV0.featureLevel())); -private short featureLevel; -private MetadataVersion metadataVersionMapping; -private Map dependencies; +private final short featureLevel; +private final MetadataVersion metadataVersionMapping; +private final Map dependencies; public static final String FEATURE_NAME = "test.feature.version"; -public static final TestFeatureVersion PRODUCTION_VERSION = TEST_1; +public static final TestFeatureVersion LATEST_PRODUCTION = TEST_1; Review Comment: It does not since in Features, I don't define a production version. I set this for testing purposes. But Features.PRODUCTION_VERSIONS does not contain this feature. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
junrao commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1612454534 ## server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java: ## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME; + +public final class FinalizedFeatures { +private final MetadataVersion metadataVersion; +private final Map finalizedFeatures; +private final long finalizedFeaturesEpoch; + +public static FinalizedFeatures fromKRaftVersion(MetadataVersion version) { +return new FinalizedFeatures(version, Collections.emptyMap(), -1, true); +} + +public FinalizedFeatures( +MetadataVersion metadataVersion, +Map finalizedFeatures, +long finalizedFeaturesEpoch, +boolean kraftMode +) { +this.metadataVersion = metadataVersion; +this.finalizedFeatures = new HashMap<>(finalizedFeatures); +this.finalizedFeaturesEpoch = finalizedFeaturesEpoch; +// In KRaft mode, we always include the metadata version in the features map. +// In ZK mode, we never include it. +if (kraftMode) { +this.finalizedFeatures.put(FEATURE_NAME, metadataVersion.featureLevel()); +} else { +this.finalizedFeatures.remove(FEATURE_NAME); +} +} + +public MetadataVersion metadataVersion() { +return metadataVersion; +} + +public Map finalizedFeatures() { +return finalizedFeatures; +} + +public long finalizedFeaturesEpoch() { +return finalizedFeaturesEpoch; +} + +@Override +public boolean equals(Object o) { +if (o == null || !(o.getClass().equals(FinalizedFeatures.class))) return false; +FinalizedFeatures other = (FinalizedFeatures) o; +return metadataVersion == other.metadataVersion && +finalizedFeatures.equals(other.finalizedFeatures) && +finalizedFeaturesEpoch == other.finalizedFeaturesEpoch; +} + +@Override +public int hashCode() { +return Objects.hash(metadataVersion, finalizedFeatures, finalizedFeaturesEpoch); +} + +@Override +public String toString() { +return "Features" + +"(version=" + metadataVersion + Review Comment: version => metadataVersion ? ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -109,6 +111,52 @@ object StorageTool extends Logging { } } + private def validateMetadataVersion(metadataVersion: MetadataVersion, config: Option[KafkaConfig]): Unit = { +if (!metadataVersion.isKRaftSupported) { + throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") +} +if (!metadataVersion.isProduction) { + if (config.get.unstableMetadataVersionsEnabled) { +System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") + } else { +throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") + } +} + } + + private[tools] def generateFeatureRecords(metadataRecords: ArrayBuffer[ApiMessageAndVersion], +metadataVersion: MetadataVersion, +specifiedFeatures: Map[String, java.lang.Short], +allFeatures: List[Features], +usesVersionDefault: Boolean): Unit = { +// If we are using --version-default, the default is based on the metadata version. +val metadataVersionForDefault = if (usesVersionDefault) Optional.of(metadataVersion) else Optional.empty[MetadataVersion]() Review Comment: > If we follow the protocol of creating a new MV for each new feature and making them production ready at the same time then the answer to your question is yes. I thought that's the model being implem
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
artemlivshits commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1612863802 ## server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java: ## @@ -21,16 +21,17 @@ public enum TestFeatureVersion implements FeatureVersion { -TEST_0(0, MetadataVersion.IBP_3_3_IV0, Collections.emptyMap()), +// TEST_1 released right before MV 3.7-IVO was released, and it has no dependencies TEST_1(1, MetadataVersion.IBP_3_7_IV0, Collections.emptyMap()), +// TEST_2 released right before MV 3.8-IVO was released, and it depends on this metadata version TEST_2(2, MetadataVersion.IBP_3_8_IV0, Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_8_IV0.featureLevel())); -private short featureLevel; -private MetadataVersion metadataVersionMapping; -private Map dependencies; +private final short featureLevel; +private final MetadataVersion metadataVersionMapping; +private final Map dependencies; public static final String FEATURE_NAME = "test.feature.version"; -public static final TestFeatureVersion PRODUCTION_VERSION = TEST_1; +public static final TestFeatureVersion LATEST_PRODUCTION = TEST_1; Review Comment: Does it mean that the broker will show the test feature to the clients? ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -109,6 +111,52 @@ object StorageTool extends Logging { } } + private def validateMetadataVersion(metadataVersion: MetadataVersion, config: Option[KafkaConfig]): Unit = { +if (!metadataVersion.isKRaftSupported) { + throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") +} +if (!metadataVersion.isProduction) { + if (config.get.unstableMetadataVersionsEnabled) { +System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") + } else { +throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") + } +} + } + + private[tools] def generateFeatureRecords(metadataRecords: ArrayBuffer[ApiMessageAndVersion], +metadataVersion: MetadataVersion, +specifiedFeatures: Map[String, java.lang.Short], +allFeatures: List[Features], +usesVersionDefault: Boolean): Unit = { +// If we are using --version-default, the default is based on the metadata version. +val metadataVersionForDefault = if (usesVersionDefault) Optional.of(metadataVersion) else Optional.empty[MetadataVersion]() Review Comment: I think it would be confusing if a user specifies the latest MV version and the result would be different from when nothing is specified (implied assumption is that nothing is shortcut for latest MV known to the tool). It would also be confusing if by default (nothing is specified) we don't have all features set to the latest versions known to the tool. We can provide guidance to new features developers in the comments (and the test feature example) and add a unit test that enforces the equivalence. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1612404062 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -109,6 +111,52 @@ object StorageTool extends Logging { } } + private def validateMetadataVersion(metadataVersion: MetadataVersion, config: Option[KafkaConfig]): Unit = { +if (!metadataVersion.isKRaftSupported) { + throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") +} +if (!metadataVersion.isProduction) { + if (config.get.unstableMetadataVersionsEnabled) { +System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") + } else { +throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") + } +} + } + + private[tools] def generateFeatureRecords(metadataRecords: ArrayBuffer[ApiMessageAndVersion], +metadataVersion: MetadataVersion, +specifiedFeatures: Map[String, java.lang.Short], +allFeatures: List[Features], +usesVersionDefault: Boolean): Unit = { +// If we are using --version-default, the default is based on the metadata version. +val metadataVersionForDefault = if (usesVersionDefault) Optional.of(metadataVersion) else Optional.empty[MetadataVersion]() Review Comment: Ok sorry to be a little all over the place here. I think we should have semantics where on each new feature version released to production, we have a MV released to production. In this case, whether we specify latest production (as it did before) or use an empty optional to specify the latest production feature SHOULD be equivalent. The only case it is not is if someone improperly doesn't set the metadataMapping correctly. There isn't a great way to enforce that (I can do so via a test), but I guess the question is whether we prefer the empty approach that ensures the latest features are provided OR if we prefer the latest production metadata approach that is simpler but may risk not picking up features if folks implement the method incorrectly/don't update the MV. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1612321070 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -109,6 +111,52 @@ object StorageTool extends Logging { } } + private def validateMetadataVersion(metadataVersion: MetadataVersion, config: Option[KafkaConfig]): Unit = { +if (!metadataVersion.isKRaftSupported) { + throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") +} +if (!metadataVersion.isProduction) { + if (config.get.unstableMetadataVersionsEnabled) { +System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") + } else { +throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") + } +} + } + + private[tools] def generateFeatureRecords(metadataRecords: ArrayBuffer[ApiMessageAndVersion], +metadataVersion: MetadataVersion, +specifiedFeatures: Map[String, java.lang.Short], +allFeatures: List[Features], +usesVersionDefault: Boolean): Unit = { +// If we are using --version-default, the default is based on the metadata version. +val metadataVersionForDefault = if (usesVersionDefault) Optional.of(metadataVersion) else Optional.empty[MetadataVersion]() Review Comment: If we want to change it back, I will also update the comments in the metadataVersionMapping method as it will not be correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1612319940 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -109,6 +111,52 @@ object StorageTool extends Logging { } } + private def validateMetadataVersion(metadataVersion: MetadataVersion, config: Option[KafkaConfig]): Unit = { +if (!metadataVersion.isKRaftSupported) { + throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") +} +if (!metadataVersion.isProduction) { + if (config.get.unstableMetadataVersionsEnabled) { +System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") + } else { +throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") + } +} + } + + private[tools] def generateFeatureRecords(metadataRecords: ArrayBuffer[ApiMessageAndVersion], +metadataVersion: MetadataVersion, +specifiedFeatures: Map[String, java.lang.Short], +allFeatures: List[Features], +usesVersionDefault: Boolean): Unit = { +// If we are using --version-default, the default is based on the metadata version. +val metadataVersionForDefault = if (usesVersionDefault) Optional.of(metadataVersion) else Optional.empty[MetadataVersion]() Review Comment: We do not. This was changed yesterday and the code was as you say in the comment. > Do we get the same result by passing in that metadataVersion to feature.defaultValue() If we follow the protocol of creating a new MV for each new feature and making them production ready at the same time then the answer to your question is yes. If we want to codify this and require a new MV (used only for mapping a default) for every new feature to be created when we mark the feature as production ready, I can switch it back to how it was yesterday. I originally changed it in the case where we want to piggyback on the next MV and we may mark the feature as production ready but not the MV. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1612319940 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -109,6 +111,52 @@ object StorageTool extends Logging { } } + private def validateMetadataVersion(metadataVersion: MetadataVersion, config: Option[KafkaConfig]): Unit = { +if (!metadataVersion.isKRaftSupported) { + throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") +} +if (!metadataVersion.isProduction) { + if (config.get.unstableMetadataVersionsEnabled) { +System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") + } else { +throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") + } +} + } + + private[tools] def generateFeatureRecords(metadataRecords: ArrayBuffer[ApiMessageAndVersion], +metadataVersion: MetadataVersion, +specifiedFeatures: Map[String, java.lang.Short], +allFeatures: List[Features], +usesVersionDefault: Boolean): Unit = { +// If we are using --version-default, the default is based on the metadata version. +val metadataVersionForDefault = if (usesVersionDefault) Optional.of(metadataVersion) else Optional.empty[MetadataVersion]() Review Comment: We do not. This was changed yesterday and the code was as you say in the comment. Here is the behavior as it stands now: > Do we get the same result by passing in that metadataVersion to feature.defaultValue() If we follow the protocol of creating a new MV for each new feature and making them production ready at the same time then the answer to your question is yes. If we want to codify this and require a new MV (used only for mapping a default) for every new feature to be created when we mark the feature as production ready, I can switch it back to how it was yesterday. ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -109,6 +111,52 @@ object StorageTool extends Logging { } } + private def validateMetadataVersion(metadataVersion: MetadataVersion, config: Option[KafkaConfig]): Unit = { +if (!metadataVersion.isKRaftSupported) { + throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") +} +if (!metadataVersion.isProduction) { + if (config.get.unstableMetadataVersionsEnabled) { +System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") + } else { +throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") + } +} + } + + private[tools] def generateFeatureRecords(metadataRecords: ArrayBuffer[ApiMessageAndVersion], +metadataVersion: MetadataVersion, +specifiedFeatures: Map[String, java.lang.Short], +allFeatures: List[Features], +usesVersionDefault: Boolean): Unit = { +// If we are using --version-default, the default is based on the metadata version. +val metadataVersionForDefault = if (usesVersionDefault) Optional.of(metadataVersion) else Optional.empty[MetadataVersion]() Review Comment: We do not. This was changed yesterday and the code was as you say in the comment. > Do we get the same result by passing in that metadataVersion to feature.defaultValue() If we follow the protocol of creating a new MV for each new feature and making them production ready at the same time then the answer to your question is yes. If we want to codify this and require a new MV (used only for mapping a default) for every new feature to be created when we mark the feature as production ready, I can switch it back to how it was yesterday. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1612314478 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -141,6 +189,9 @@ object StorageTool extends Logging { formatParser.addArgument("--release-version", "-r"). action(store()). help(s"A KRaft release version to use for the initial metadata.version. The minimum is ${MetadataVersion.IBP_3_0_IV0}, the default is ${MetadataVersion.LATEST_PRODUCTION}") +formatParser.addArgument("--feature"). + help("A feature upgrade we should perform, in feature=level format. For example: `metadata.version=5`."). Review Comment: We can do so. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1612314269 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -109,6 +105,51 @@ object StorageTool extends Logging { } } + private def validateMetadataVersion(metadataVersion: MetadataVersion, config: Option[KafkaConfig]): Unit = { +if (!metadataVersion.isKRaftSupported) { + throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") +} +if (!metadataVersion.isProduction) { + if (config.get.unstableMetadataVersionsEnabled) { +System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") + } else { +throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") + } +} + } + + private[tools] def generateFeatureRecords(metadataRecords: ArrayBuffer[ApiMessageAndVersion], +metadataVersion: MetadataVersion, +specifiedFeatures: Map[String, java.lang.Short], +allFeatures: List[Features]): Unit = { +// If we are using --version-default, the default is based on the metadata version. Review Comment: This was a typo. I will fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1612314113 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -156,16 +200,27 @@ object StorageTool extends Logging { def getMetadataVersion( namespace: Namespace, +featureNamesAndLevelsMap: Map[String, java.lang.Short], defaultVersionString: Option[String] ): MetadataVersion = { val defaultValue = defaultVersionString match { case Some(versionString) => MetadataVersion.fromVersionString(versionString) case None => MetadataVersion.LATEST_PRODUCTION } -Option(namespace.getString("release_version")) - .map(ver => MetadataVersion.fromVersionString(ver)) - .getOrElse(defaultValue) +val releaseVersionTag = Option(namespace.getString("release_version")) +val featureTag = featureNamesAndLevelsMap.get(MetadataVersion.FEATURE_NAME) + +(releaseVersionTag, featureTag) match { + case (Some(_), Some(_)) => +throw new IllegalArgumentException("Both --release_version and --feature were set. Only one of the two flags can be set.") Review Comment: Right. I can update this check. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1612313415 ## core/src/main/scala/kafka/server/BrokerFeatures.scala: ## @@ -75,16 +76,19 @@ object BrokerFeatures extends Logging { } def defaultSupportedFeatures(unstableMetadataVersionsEnabled: Boolean): Features[SupportedVersionRange] = { -Features.supportedFeatures( - java.util.Collections.singletonMap(MetadataVersion.FEATURE_NAME, +val features = new util.HashMap[String, SupportedVersionRange]() + features.put(MetadataVersion.FEATURE_NAME, new SupportedVersionRange( MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), if (unstableMetadataVersionsEnabled) { MetadataVersion.latestTesting.featureLevel } else { MetadataVersion.latestProduction.featureLevel - } -))) + })) + FeatureVersion.PRODUCTION_FEATURES.forEach { feature => Review Comment: This will be in a followup. I'm working on it in the background. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
junrao commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1602305075 ## server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java: ## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Collections; +import java.util.Map; + +public enum TestFeatureVersion implements FeatureVersionUtils.FeatureVersionImpl { + +TEST_0(0, MetadataVersion.IBP_3_3_IV0, Collections.emptyMap()), +TEST_1(1, MetadataVersion.IBP_3_7_IV0, Collections.emptyMap()), +TEST_2(2, MetadataVersion.IBP_3_8_IV0, Collections.emptyMap()); + +private short featureLevel; +private MetadataVersion metadataVersionMapping; +private Map dependencies; Review Comment: Should we make those instance vals final? ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -156,16 +200,27 @@ object StorageTool extends Logging { def getMetadataVersion( namespace: Namespace, +featureNamesAndLevelsMap: Map[String, java.lang.Short], defaultVersionString: Option[String] ): MetadataVersion = { val defaultValue = defaultVersionString match { case Some(versionString) => MetadataVersion.fromVersionString(versionString) case None => MetadataVersion.LATEST_PRODUCTION } -Option(namespace.getString("release_version")) - .map(ver => MetadataVersion.fromVersionString(ver)) - .getOrElse(defaultValue) +val releaseVersionTag = Option(namespace.getString("release_version")) +val featureTag = featureNamesAndLevelsMap.get(MetadataVersion.FEATURE_NAME) + +(releaseVersionTag, featureTag) match { + case (Some(_), Some(_)) => +throw new IllegalArgumentException("Both --release_version and --feature were set. Only one of the two flags can be set.") Review Comment: We should disallow the case where both --release_version and --feature are used, but --feature doesn't include metadata, right? ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -141,6 +189,9 @@ object StorageTool extends Logging { formatParser.addArgument("--release-version", "-r"). action(store()). help(s"A KRaft release version to use for the initial metadata.version. The minimum is ${MetadataVersion.IBP_3_0_IV0}, the default is ${MetadataVersion.LATEST_PRODUCTION}") +formatParser.addArgument("--feature"). + help("A feature upgrade we should perform, in feature=level format. For example: `metadata.version=5`."). Review Comment: Should we add a shorthand for --feature like other options? ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -109,6 +105,51 @@ object StorageTool extends Logging { } } + private def validateMetadataVersion(metadataVersion: MetadataVersion, config: Option[KafkaConfig]): Unit = { +if (!metadataVersion.isKRaftSupported) { + throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") +} +if (!metadataVersion.isProduction) { + if (config.get.unstableMetadataVersionsEnabled) { +System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") + } else { +throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") + } +} + } + + private[tools] def generateFeatureRecords(metadataRecords: ArrayBuffer[ApiMessageAndVersion], +metadataVersion: MetadataVersion, +specifiedFeatures: Map[String, java.lang.Short], +allFeatures: List[Features]): Unit = { +// If we are using --version-default, the default is based on the metadata version. Review Comment: Hmm, does the storage tool support the --version-default option? ## server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java: ## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. S
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1611981645 ## server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java: ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.common; + +import java.util.Map; + +public interface FeatureVersion { + +/** + * The level of the feature. 0 means the feature is disabled. + */ +short featureLevel(); + +/** + * The name of the feature. + */ +String featureName(); + +/** + * The minimum metadata version which sets this feature version as default. When bootstrapping using only + * a metadata version, a reasonable default for all other features is chosen based on this value. + * This should be defined as the next metadata version to be released when the feature version becomes production ready. + * (Ie, if the current production MV is 17 when a feature version is released, its mapping should be to MV 18) Review Comment: Sorry I think I'm not explaining this clearly. Once it is released, this value shouldn't change. I wish KIP-1014 was completed so this could be clearer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1611979308 ## server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java: ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Collections; +import java.util.Map; + +public enum TestFeatureVersion implements FeatureVersion { + +TEST_0(0, MetadataVersion.IBP_3_3_IV0, Collections.emptyMap()), Review Comment: I've made this change -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
dajac commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1611842086 ## server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java: ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.common; + +import java.util.Map; + +public interface FeatureVersion { + +/** + * The level of the feature. 0 means the feature is disabled. + */ +short featureLevel(); + +/** + * The name of the feature. + */ +String featureName(); + +/** + * The minimum metadata version which sets this feature version as default. When bootstrapping using only + * a metadata version, a reasonable default for all other features is chosen based on this value. + * This should be defined as the next metadata version to be released when the feature version becomes production ready. + * (Ie, if the current production MV is 17 when a feature version is released, its mapping should be to MV 18) Review Comment: Understood. The downside of this is that you have to ensure that you change it when the next unstable one is released and the new feature is not ready to be promoted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1611825497 ## server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java: ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.common; + +import java.util.Map; + +public interface FeatureVersion { + +/** + * The level of the feature. 0 means the feature is disabled. + */ +short featureLevel(); + +/** + * The name of the feature. + */ +String featureName(); + +/** + * The minimum metadata version which sets this feature version as default. When bootstrapping using only + * a metadata version, a reasonable default for all other features is chosen based on this value. + * This should be defined as the next metadata version to be released when the feature version becomes production ready. + * (Ie, if the current production MV is 17 when a feature version is released, its mapping should be to MV 18) Review Comment: My thought process is that we will just attach it to the current next unstable one. If there is no such MV we create it and if there is still no feature atttached to it, we will just release it at the release. I probably need to write up a formal doc on how this will work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
dajac commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1611615148 ## server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java: ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.common; + +import java.util.Map; + +public interface FeatureVersion { + +/** + * The level of the feature. 0 means the feature is disabled. + */ +short featureLevel(); + +/** + * The name of the feature. + */ +String featureName(); + +/** + * The minimum metadata version which sets this feature version as default. When bootstrapping using only + * a metadata version, a reasonable default for all other features is chosen based on this value. + * This should be defined as the next metadata version to be released when the feature version becomes production ready. + * (Ie, if the current production MV is 17 when a feature version is released, its mapping should be to MV 18) Review Comment: > How do we know the release version when we create the feature. My understanding is that the new version of the feature will start as an unstable one. When we promote it to production ready, we can attach it to the correct MV (the latest one available in the release, I suppose). ## core/src/main/scala/kafka/server/BrokerFeatures.scala: ## @@ -75,16 +75,19 @@ object BrokerFeatures extends Logging { } def defaultSupportedFeatures(unstableMetadataVersionsEnabled: Boolean): Features[SupportedVersionRange] = { -Features.supportedFeatures( - java.util.Collections.singletonMap(MetadataVersion.FEATURE_NAME, +val features = new util.HashMap[String, SupportedVersionRange]() + features.put(MetadataVersion.FEATURE_NAME, new SupportedVersionRange( MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), if (unstableMetadataVersionsEnabled) { MetadataVersion.latestTesting.featureLevel } else { MetadataVersion.latestProduction.featureLevel - } -))) + })) +org.apache.kafka.server.common.Features.PRODUCTION_FEATURES.forEach { feature => Review Comment: Could we import `org.apache.kafka.server.common.Features.PRODUCTION_FEATURES` as we only use `PRODUCTION_FEATURES` in the end? ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -109,6 +111,52 @@ object StorageTool extends Logging { } } + private def validateMetadataVersion(metadataVersion: MetadataVersion, config: Option[KafkaConfig]): Unit = { +if (!metadataVersion.isKRaftSupported) { + throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") +} +if (!metadataVersion.isProduction) { + if (config.get.unstableMetadataVersionsEnabled) { +System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") + } else { +throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") + } +} + } + + private[tools] def generateFeatureRecords(metadataRecords: ArrayBuffer[ApiMessageAndVersion], +metadataVersion: MetadataVersion, +specifiedFeatures: Map[String, java.lang.Short], +allFeatures: List[Features], +usesVersionDefault: Boolean): Unit = { +// If we are using --version-default, the default is based on the metadata version. +val metadataVersionForDefault = if (usesVersionDefault) Optional.of(metadataVersion) else Optional.empty[MetadataVersion]() + +val allNonZeroFeaturesAndLevels: ArrayBuffer[FeatureVersion] = mutable.ArrayBuffer[FeatureVersion]() Review Comment: nit: There is an extra space. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1610507371 ## server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java: ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Collections; +import java.util.Map; + +public enum TestFeatureVersion implements FeatureVersion { + +TEST_0(0, MetadataVersion.IBP_3_3_IV0, Collections.emptyMap()), Review Comment: I tried this, but the logic for defaults breaks when we don't have a 0 version. ``` val allFeaturesAndLevels: List[FeatureVersion] = allFeatures.map { feature => val level: java.lang.Short = specifiedFeatures.getOrElse(feature.featureName, feature.defaultValue(metadataVersionForDefault)) feature.fromFeatureLevel(level) } ``` Here, I suppose we could do some work to filter out 0s and assume on the rest of the code that a missing value in the map = 0. If that is preferred I can do that instead, but I also don't think the 0 version necessarily hurts anything. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1610472137 ## server-common/src/main/java/org/apache/kafka/server/common/Features.java: ## @@ -64,14 +63,16 @@ public enum Features { PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature -> feature.usedInProduction).collect(Collectors.toList()); +PRODUCTION_FEATURE_NAMES = PRODUCTION_FEATURES.stream().map(feature -> +feature.name).collect(Collectors.toList()); } public String featureName() { return name; } public FeatureVersion[] features() { Review Comment: sure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1610471028 ## server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java: ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.common; + +import java.util.Map; + +public interface FeatureVersion { + +/** + * The level of the feature. 0 means the feature is disabled. + */ +short featureLevel(); + +/** + * The name of the feature. + */ +String featureName(); + +/** + * The minimum metadata version which sets this feature version as default. When bootstrapping using only + * a metadata version, a reasonable default for all other features is chosen based on this value. + * This should be defined as the next metadata version to be released when the feature version becomes production ready. + * (Ie, if the current production MV is 17 when a feature version is released, its mapping should be to MV 18) Review Comment: How do we know the release version when we create the feature. The release version can change, so should we change it as each new MV is added? I suppose you mean the latest MV when the feature is released. This is also what I did but adding 1 to every MV for the reasons I explained [here](https://github.com/apache/kafka/pull/15685#discussion_r1610248735) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1610471028 ## server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java: ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.common; + +import java.util.Map; + +public interface FeatureVersion { + +/** + * The level of the feature. 0 means the feature is disabled. + */ +short featureLevel(); + +/** + * The name of the feature. + */ +String featureName(); + +/** + * The minimum metadata version which sets this feature version as default. When bootstrapping using only + * a metadata version, a reasonable default for all other features is chosen based on this value. + * This should be defined as the next metadata version to be released when the feature version becomes production ready. + * (Ie, if the current production MV is 17 when a feature version is released, its mapping should be to MV 18) Review Comment: How do we know the release version when we create the feature. The release version can change, so should we change it as each new MV is added? This is also what I did but adding 1 to every MV for the reasons I explained [here](https://github.com/apache/kafka/pull/15685#discussion_r1610248735) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
artemlivshits commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1610437612 ## server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java: ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Collections; +import java.util.Map; + +public enum TestFeatureVersion implements FeatureVersion { + +TEST_0(0, MetadataVersion.IBP_3_3_IV0, Collections.emptyMap()), Review Comment: Version 0 == "feature doesn't exist", does it need to be explicitly codified the enum? ## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ## @@ -470,8 +471,8 @@ BrokerFeature processRegistrationFeature( // A feature is not found in the finalizedFeature map if it is unknown to the controller or set to 0 (feature not enabled). // As more features roll out, it may be common to leave a feature disabled, so this log is debug level in the case Review Comment: Comment seems to be outdated w.r.t. latest logic? ## server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java: ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.common; + +import java.util.Map; + +public interface FeatureVersion { + +/** + * The level of the feature. 0 means the feature is disabled. + */ +short featureLevel(); + +/** + * The name of the feature. + */ +String featureName(); + +/** + * The minimum metadata version which sets this feature version as default. When bootstrapping using only + * a metadata version, a reasonable default for all other features is chosen based on this value. + * This should be defined as the next metadata version to be released when the feature version becomes production ready. + * (Ie, if the current production MV is 17 when a feature version is released, its mapping should be to MV 18) Review Comment: Can we make it map to the release version? If not, can we add a comment explaining the logic? Intuitively, if we have a feature like - FOO(1, 42) // released version 1 when MV got bumped to 42 - FOO(2, 77) // released version 2 when MV got bumped to 77 - FOO(3, 77) // released version 3 at the same time as 2 we should be able to discover that if we got MV 45, then FOO=1 because 1 was available at 42 and 2&3 are not available yet. If we got 78 then it should be 3 because it was max available at 77. If we got MV 30 then we should get 0 (implicit) because it wasn't available yet. Using the same table, we can determine that if a feature version 1 was selected with MV 41, then it's invalid combination; similarly if feature 2 or 3 was selected with MV 45 it's invalid combination. And we can select 1, 2 or 3 when MV is 77. ## server-common/src/main/java/org/apache/kafka/server/common/Features.java: ## @@ -64,14 +63,16 @@ public enum Features { PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature -> feature.usedInProduction).collect(Collectors.toList()); +PRODUCTION_FEATURE_NAMES = PRODUCTION_FEATURES.stream().map(feature -> +feature.name).collect(Collectors.toList()); } public String featureName() { return name; }
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1610251718 ## server-common/src/main/java/org/apache/kafka/server/common/Features.java: ## @@ -16,72 +16,135 @@ */ package org.apache.kafka.server.common; -import java.util.Collections; -import java.util.HashMap; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; import java.util.Map; -import java.util.Objects; +import java.util.stream.Collectors; -import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME; +/** + * This is enum for the various features implemented for Kafka clusters. + * KIP-584: Versioning Scheme for Features introduced the idea of various features, but only added one feature -- MetadataVersion. + * KIP-1022: Formatting and Updating Features allowed for more features to be added. In order to set and update features, + * they need to be specified via the StorageTool or FeatureCommand tools. + * + * Having a unified enum for the features that will use a shared type in the API used to set and update them + * makes it easier to process these features. + */ +public enum Features { + +/** + * Features defined. If a feature is included in this list, and marked to be used in production they will also be specified when + * formatting a cluster via the StorageTool. MetadataVersion is handled separately, so it is not included here. + * + * See {@link TestFeatureVersion} as an example. See {@link FeatureVersion} when implementing a new feature. + */ +TEST_VERSION("test.feature.version", TestFeatureVersion.values(), TestFeatureVersion::fromFeatureLevel, false); -public final class Features { -private final MetadataVersion version; -private final Map finalizedFeatures; -private final long finalizedFeaturesEpoch; +public static final Features[] FEATURES; +public static final List PRODUCTION_FEATURES; +private final String name; +private final FeatureVersion[] features; +private final CreateMethod createFeatureVersionMethod; +private final boolean usedInProduction; -public static Features fromKRaftVersion(MetadataVersion version) { -return new Features(version, Collections.emptyMap(), -1, true); +Features(String name, + FeatureVersion[] features, + CreateMethod createMethod, + boolean usedInProduction) { +this.name = name; +this.features = features; +this.createFeatureVersionMethod = createMethod; +this.usedInProduction = usedInProduction; } -public Features( -MetadataVersion version, -Map finalizedFeatures, -long finalizedFeaturesEpoch, -boolean kraftMode -) { -this.version = version; -this.finalizedFeatures = new HashMap<>(finalizedFeatures); -this.finalizedFeaturesEpoch = finalizedFeaturesEpoch; -// In KRaft mode, we always include the metadata version in the features map. -// In ZK mode, we never include it. -if (kraftMode) { -this.finalizedFeatures.put(FEATURE_NAME, version.featureLevel()); -} else { -this.finalizedFeatures.remove(FEATURE_NAME); -} +static { +Features[] enumValues = Features.values(); +FEATURES = Arrays.copyOf(enumValues, enumValues.length); + +PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature -> +feature.usedInProduction).collect(Collectors.toList()); } -public MetadataVersion metadataVersion() { -return version; +public String featureName() { +return name; } -public Map finalizedFeatures() { -return finalizedFeatures; +public FeatureVersion[] features() { +return features; } -public long finalizedFeaturesEpoch() { -return finalizedFeaturesEpoch; +/** + * Creates a FeatureVersion from a given name and level with the correct feature object underneath. + * + * @param level the level of the feature + * @returns the FeatureVersionUtils.FeatureVersion for the feature the enum is based on. + * @throwsIllegalArgumentException if the feature name is not valid (not implemented for this method) + */ +public FeatureVersion fromFeatureLevel(short level) { +return createFeatureVersionMethod.fromFeatureLevel(level); Review Comment: Iterate through all the features and all the levels of each feature? We could I suppose, but this could get slow if we have a lot of different features and levels. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1610250283 ## server-common/src/main/java/org/apache/kafka/server/common/Features.java: ## @@ -16,72 +16,135 @@ */ package org.apache.kafka.server.common; -import java.util.Collections; -import java.util.HashMap; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; import java.util.Map; -import java.util.Objects; +import java.util.stream.Collectors; -import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME; +/** + * This is enum for the various features implemented for Kafka clusters. + * KIP-584: Versioning Scheme for Features introduced the idea of various features, but only added one feature -- MetadataVersion. + * KIP-1022: Formatting and Updating Features allowed for more features to be added. In order to set and update features, + * they need to be specified via the StorageTool or FeatureCommand tools. + * + * Having a unified enum for the features that will use a shared type in the API used to set and update them + * makes it easier to process these features. + */ +public enum Features { + +/** + * Features defined. If a feature is included in this list, and marked to be used in production they will also be specified when + * formatting a cluster via the StorageTool. MetadataVersion is handled separately, so it is not included here. + * + * See {@link TestFeatureVersion} as an example. See {@link FeatureVersion} when implementing a new feature. + */ +TEST_VERSION("test.feature.version", TestFeatureVersion.values(), TestFeatureVersion::fromFeatureLevel, false); -public final class Features { -private final MetadataVersion version; -private final Map finalizedFeatures; -private final long finalizedFeaturesEpoch; +public static final Features[] FEATURES; +public static final List PRODUCTION_FEATURES; +private final String name; +private final FeatureVersion[] features; +private final CreateMethod createFeatureVersionMethod; +private final boolean usedInProduction; -public static Features fromKRaftVersion(MetadataVersion version) { -return new Features(version, Collections.emptyMap(), -1, true); +Features(String name, + FeatureVersion[] features, + CreateMethod createMethod, + boolean usedInProduction) { +this.name = name; +this.features = features; +this.createFeatureVersionMethod = createMethod; +this.usedInProduction = usedInProduction; } -public Features( -MetadataVersion version, -Map finalizedFeatures, -long finalizedFeaturesEpoch, -boolean kraftMode -) { -this.version = version; -this.finalizedFeatures = new HashMap<>(finalizedFeatures); -this.finalizedFeaturesEpoch = finalizedFeaturesEpoch; -// In KRaft mode, we always include the metadata version in the features map. -// In ZK mode, we never include it. -if (kraftMode) { -this.finalizedFeatures.put(FEATURE_NAME, version.featureLevel()); -} else { -this.finalizedFeatures.remove(FEATURE_NAME); -} +static { +Features[] enumValues = Features.values(); +FEATURES = Arrays.copyOf(enumValues, enumValues.length); + +PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature -> +feature.usedInProduction).collect(Collectors.toList()); } -public MetadataVersion metadataVersion() { -return version; +public String featureName() { +return name; } -public Map finalizedFeatures() { -return finalizedFeatures; +public FeatureVersion[] features() { +return features; } -public long finalizedFeaturesEpoch() { -return finalizedFeaturesEpoch; +/** + * Creates a FeatureVersion from a given name and level with the correct feature object underneath. + * + * @param level the level of the feature + * @returns the FeatureVersionUtils.FeatureVersion for the feature the enum is based on. + * @throwsIllegalArgumentException if the feature name is not valid (not implemented for this method) + */ +public FeatureVersion fromFeatureLevel(short level) { +return createFeatureVersionMethod.fromFeatureLevel(level); } -@Override -public boolean equals(Object o) { -if (o == null || !(o.getClass().equals(Features.class))) return false; -Features other = (Features) o; -return version == other.version && -finalizedFeatures.equals(other.finalizedFeatures) && -finalizedFeaturesEpoch == other.finalizedFeaturesEpoch; +/** + * A method to validate the feature can be set. If a given feature relies on another feature, the dependencies should be + * captured in {@link FeatureVers
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1610249426 ## core/src/main/scala/kafka/server/BrokerFeatures.scala: ## @@ -75,16 +75,19 @@ object BrokerFeatures extends Logging { } def defaultSupportedFeatures(unstableMetadataVersionsEnabled: Boolean): Features[SupportedVersionRange] = { -Features.supportedFeatures( - java.util.Collections.singletonMap(MetadataVersion.FEATURE_NAME, +val features = new util.HashMap[String, SupportedVersionRange]() + features.put(MetadataVersion.FEATURE_NAME, new SupportedVersionRange( MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), if (unstableMetadataVersionsEnabled) { MetadataVersion.latestTesting.featureLevel } else { MetadataVersion.latestProduction.featureLevel - } -))) + })) +org.apache.kafka.server.common.Features.PRODUCTION_FEATURES.forEach { feature => Review Comment: There are two imports for Features in this file from different modules. Java doesn't allow aliasing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1610248735 ## server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java: ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.common; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public interface FeatureVersion { + +/** + * The level of the feature. 0 means the feature is disabled. + */ +short featureLevel(); + +/** + * The name of the feature. + */ +String featureName(); + +/** + * The next metadata version to be released when the feature became production ready. + * (Ie, if the current production MV is 17 when a feature is released, its mapping should be to MV 18) + */ +MetadataVersion metadataVersionMapping(); Review Comment: Hmm, there are a lot of components that are all getting mixed around here. 1. All features require IBP-3.3.0-IV0. This is because this is the minimum MV to bootstrap and write a feature record :) This is encoded in Features#validateVersion and doesn't need to be specified for all features since it will be required by all. 2. Some features may require a specific other feature version in order to be set. None of the features we proposed (transaction/group coordinator) need this but it was requested in the KIP, so I have implemented a framework to do so. 3. We need to set a reasonable default feature for when folks bootstrap using only metadata. There are a few options here. One is to set all features to 0. We ended up deciding to take the latest features as default. There is a small wrinkle here in that for a given MV, we may introduce a feature version after the code that releases the MV. If folks are running off trunk, they will have different features for the same MV if we choose the current MV, and that's why I suggest the next one. This bootstrap method is only used for 3. 1 and 2 are covered by the validateVersion method and by enumerating non 3.3 dependencies when defining the featureVersion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
dajac commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1609917313 ## server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java: ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.common; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public interface FeatureVersion { + +/** + * The level of the feature. 0 means the feature is disabled. + */ +short featureLevel(); + +/** + * The name of the feature. + */ +String featureName(); + +/** + * The next metadata version to be released when the feature became production ready. + * (Ie, if the current production MV is 17 when a feature is released, its mapping should be to MV 18) + */ +MetadataVersion metadataVersionMapping(); Review Comment: > I would like the mapping to remain consistent for MV. If we do the current mapping, some versions/images with MV X will have feature version Y and some will not (since it wasn't created when MV X was first released) Isn't it something that will happen anyway? For instance, I will add `group.version=1` soon and it will require an old MV. Likely the oldest one supported by kraft. I am not too opinionated on this one though. If you believe that this is the right approach. I trust you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
dajac commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1609917313 ## server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java: ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.common; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public interface FeatureVersion { + +/** + * The level of the feature. 0 means the feature is disabled. + */ +short featureLevel(); + +/** + * The name of the feature. + */ +String featureName(); + +/** + * The next metadata version to be released when the feature became production ready. + * (Ie, if the current production MV is 17 when a feature is released, its mapping should be to MV 18) + */ +MetadataVersion metadataVersionMapping(); Review Comment: > I would like the mapping to remain consistent for MV. If we do the current mapping, some versions/images with MV X will have feature version Y and some will not (since it wasn't created when MV X was first released) Isn't it something that will happen anyway? For instance, I will add `group.version=1` soon and it will require an old MV. Likely the oldest one supported by kraft. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
artemlivshits commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1609158284 ## core/src/main/scala/kafka/server/BrokerFeatures.scala: ## @@ -75,16 +75,19 @@ object BrokerFeatures extends Logging { } def defaultSupportedFeatures(unstableMetadataVersionsEnabled: Boolean): Features[SupportedVersionRange] = { -Features.supportedFeatures( - java.util.Collections.singletonMap(MetadataVersion.FEATURE_NAME, +val features = new util.HashMap[String, SupportedVersionRange]() + features.put(MetadataVersion.FEATURE_NAME, new SupportedVersionRange( MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), if (unstableMetadataVersionsEnabled) { MetadataVersion.latestTesting.featureLevel } else { MetadataVersion.latestProduction.featureLevel - } -))) + })) +org.apache.kafka.server.common.Features.PRODUCTION_FEATURES.forEach { feature => Review Comment: Any reasons this is a fully qualified class name and not just imported as it's usually done? ## server-common/src/main/java/org/apache/kafka/server/common/Features.java: ## @@ -16,72 +16,135 @@ */ package org.apache.kafka.server.common; -import java.util.Collections; -import java.util.HashMap; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; import java.util.Map; -import java.util.Objects; +import java.util.stream.Collectors; -import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME; +/** + * This is enum for the various features implemented for Kafka clusters. + * KIP-584: Versioning Scheme for Features introduced the idea of various features, but only added one feature -- MetadataVersion. + * KIP-1022: Formatting and Updating Features allowed for more features to be added. In order to set and update features, + * they need to be specified via the StorageTool or FeatureCommand tools. + * + * Having a unified enum for the features that will use a shared type in the API used to set and update them + * makes it easier to process these features. + */ +public enum Features { + +/** + * Features defined. If a feature is included in this list, and marked to be used in production they will also be specified when + * formatting a cluster via the StorageTool. MetadataVersion is handled separately, so it is not included here. + * + * See {@link TestFeatureVersion} as an example. See {@link FeatureVersion} when implementing a new feature. + */ +TEST_VERSION("test.feature.version", TestFeatureVersion.values(), TestFeatureVersion::fromFeatureLevel, false); -public final class Features { -private final MetadataVersion version; -private final Map finalizedFeatures; -private final long finalizedFeaturesEpoch; +public static final Features[] FEATURES; +public static final List PRODUCTION_FEATURES; +private final String name; +private final FeatureVersion[] features; +private final CreateMethod createFeatureVersionMethod; +private final boolean usedInProduction; -public static Features fromKRaftVersion(MetadataVersion version) { -return new Features(version, Collections.emptyMap(), -1, true); +Features(String name, + FeatureVersion[] features, + CreateMethod createMethod, + boolean usedInProduction) { +this.name = name; +this.features = features; +this.createFeatureVersionMethod = createMethod; +this.usedInProduction = usedInProduction; } -public Features( -MetadataVersion version, -Map finalizedFeatures, -long finalizedFeaturesEpoch, -boolean kraftMode -) { -this.version = version; -this.finalizedFeatures = new HashMap<>(finalizedFeatures); -this.finalizedFeaturesEpoch = finalizedFeaturesEpoch; -// In KRaft mode, we always include the metadata version in the features map. -// In ZK mode, we never include it. -if (kraftMode) { -this.finalizedFeatures.put(FEATURE_NAME, version.featureLevel()); -} else { -this.finalizedFeatures.remove(FEATURE_NAME); -} +static { +Features[] enumValues = Features.values(); +FEATURES = Arrays.copyOf(enumValues, enumValues.length); + +PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature -> +feature.usedInProduction).collect(Collectors.toList()); } -public MetadataVersion metadataVersion() { -return version; +public String featureName() { +return name; } -public Map finalizedFeatures() { -return finalizedFeatures; +public FeatureVersion[] features() { +return features; } -public long finalizedFeaturesEpoch() { -return finalizedFeaturesEpoch; +/** + * Creates a FeatureVersion from
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1608567147 ## server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java: ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.common; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public interface FeatureVersion { + +/** + * The level of the feature. 0 means the feature is disabled. + */ +short featureLevel(); + +/** + * The name of the feature. + */ +String featureName(); + +/** + * The next metadata version to be released when the feature became production ready. + * (Ie, if the current production MV is 17 when a feature is released, its mapping should be to MV 18) + */ +MetadataVersion metadataVersionMapping(); Review Comment: I would like the mapping to remain consistent for MV. If we do the current mapping, some versions/images with MV X will have feature version Y and some will not (since it wasn't created when MV X was first released) Sorry I forgot to change the name, I will do that today 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
dajac commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1608208322 ## server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java: ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.common; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public interface FeatureVersion { + +/** + * The level of the feature. 0 means the feature is disabled. + */ +short featureLevel(); + +/** + * The name of the feature. + */ +String featureName(); + +/** + * The next metadata version to be released when the feature became production ready. + * (Ie, if the current production MV is 17 when a feature is released, its mapping should be to MV 18) + */ +MetadataVersion metadataVersionMapping(); Review Comment: I am still confused by this one. Based on our offline discussion, my understanding is that this is only used during bootstrapping. We should try to make this clear in the name and in the javadoc. > (Ie, if the current production MV is 17 when a feature is released, its mapping should be to MV 18) For my understanding, why do we require to be the next one? Requiring the current seems more natural but I may be missing something. ## server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java: ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.common; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public interface FeatureVersion { + +/** + * The level of the feature. 0 means the feature is disabled. + */ +short featureLevel(); + +/** + * The name of the feature. + */ +String featureName(); + +/** + * The next metadata version to be released when the feature became production ready. + * (Ie, if the current production MV is 17 when a feature is released, its mapping should be to MV 18) + */ +MetadataVersion metadataVersionMapping(); + +/** + * A mapping from feature to level for all features that this feature depends on. If this feature doesn't + * depend on any others, return an empty map. + * For example, say feature X level x relies on feature Y level y: + * feature (X level x).dependencies() will return (Y -> y) + */ +Map dependencies(); + +/** + * Utility method to map a list of FeatureVersion to a map of feature name to feature level + */ +static Map featureImplsToMap(List features) { +return features.stream().collect(Collectors.toMap(FeatureVersion::featureName, FeatureVersion::featureLevel)); +} Review Comment: This one feels a bit weird in this interface. Should we move it to `Features`? ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -109,6 +105,49 @@ object StorageTool extends Logging { } } + private def validateMetadataVersion(metadataVersion: MetadataVersion, config: Option[KafkaConfig]): Unit = { +if (!metadataVersion.isKRaftSupported) { + throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") +} +if (!metadataVersion.isProduction) { + if (config.get.unstableMetadataVersionsEnabled) { +Sy
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1604173884 ## server-common/src/main/java/org/apache/kafka/server/common/FeatureVersionUtils.java: ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.common; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public interface FeatureVersionUtils { Review Comment: Hmmm. This was partially due to the evolution of the api. Originally all the interfaces here were methods that were used as parameters for the FeatureVersion enums. I can try to reorganize this, but I may need some time to think about it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1604168230 ## core/src/main/scala/kafka/server/BrokerFeatures.scala: ## @@ -75,16 +76,19 @@ object BrokerFeatures extends Logging { } def defaultSupportedFeatures(unstableMetadataVersionsEnabled: Boolean): Features[SupportedVersionRange] = { Review Comment: I can look into adding something. The problem is I don't add any new production features in this change. I would add one in the next PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1604168230 ## core/src/main/scala/kafka/server/BrokerFeatures.scala: ## @@ -75,16 +76,19 @@ object BrokerFeatures extends Logging { } def defaultSupportedFeatures(unstableMetadataVersionsEnabled: Boolean): Features[SupportedVersionRange] = { Review Comment: I can look into adding something. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1603893834 ## server-common/src/main/java/org/apache/kafka/server/common/FeatureVersionUtils.java: ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.common; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public interface FeatureVersionUtils { + +interface FeatureVersionImpl { +short featureLevel(); Review Comment: I had a javadoc that said "the level of the feaure" but didn't know if that was silly. I can add something. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1603875839 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -469,4 +522,28 @@ object StorageTool extends Logging { } 0 } + + private def parseNameAndLevel(input: String): Array[String] = { +val equalsIndex = input.indexOf("=") +if (equalsIndex < 0) + throw new RuntimeException("Can't parse feature=level string " + input + ": equals sign not found.") +val name = input.substring(0, equalsIndex).trim +val levelString = input.substring(equalsIndex + 1).trim +try { + levelString.toShort +} catch { + case _: Throwable => +throw new RuntimeException("Can't parse feature=level string " + input + ": " + "unable to parse " + levelString + " as a short.") +} +Array[String](name, levelString) Review Comment: I pulled this method from FeatureCommand (originally I wanted them to use a shared method from a helper but that was a headache so I duplicated the method.) I can revise this one though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
dajac commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1603437357 ## server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java: ## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * This is enum for the various features implemented for Kafka clusters. + * KIP-584: Versioning Scheme for Features introduced the idea of various features, but only added one feature -- MetadataVersion. + * KIP-1022: Formatting and Updating Features allowed for more features to be added. In order to set and update features, + * they need to be specified via the StorageTool or FeatureCommand tools. + * + * Having a unified enum for the features that will use a shared type in the API used to set and update them + * makes it easier to process these features. + */ +public enum FeatureVersion { + +/** + * Features defined. If a feature is included in this list, and marked to be used in production they will also be specified when + * formatting a cluster via the StorageTool. MetadataVersion is handled separately, so it is not included here. + * + * See {@link TestFeatureVersion} as an example. See {@link FeatureVersionUtils.FeatureVersionImpl} when implementing a new feature. + */ +TEST_VERSION("test.feature.version", TestFeatureVersion.values(), TestFeatureVersion::fromFeatureLevel, false); + +public static final FeatureVersion[] FEATURES; +public static final List PRODUCTION_FEATURES; +private final String name; +private final FeatureVersionUtils.FeatureVersionImpl[] features; +private final FeatureVersionUtils.CreateMethod createFeatureVersionMethod; +private final boolean usedInProduction; + +FeatureVersion(String name, + FeatureVersionUtils.FeatureVersionImpl[] features, + FeatureVersionUtils.CreateMethod createMethod, + boolean usedInProduction) { +this.name = name; +this.features = features; +this.createFeatureVersionMethod = createMethod; +this.usedInProduction = usedInProduction; +} + +static { +FeatureVersion[] enumValues = FeatureVersion.values(); +FEATURES = Arrays.copyOf(enumValues, enumValues.length); + +PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature -> +feature.usedInProduction).collect(Collectors.toList()); +} + +public String featureName() { +return name; +} + +/** + * Creates a FeatureVersion from a given name and level with the correct feature object underneath. + * + * @param level the level of the feature + * @returns the FeatureVersionUtils.FeatureVersionImpl for the feature the enum is based on. + * @throwsIllegalArgumentException if the feature name is not valid (not implemented for this method) + */ +public FeatureVersionUtils.FeatureVersionImpl fromFeatureLevel(short level) { +return createFeatureVersionMethod.fromFeatureLevel(level); +} + +/** + * A method to validate the feature can be set. If a given feature relies on another feature, the dependencies should be + * captured in {@link FeatureVersionUtils.FeatureVersionImpl#dependencies()} + * + * For example, say feature X level x relies on feature Y level y: + * if feature X >= x then throw an error if feature Y < y. + * + * All feature levels above 0 require metadata.version=4 (IBP_3_3_IV0) in order to write the feature records to the cluster. + * + * @param feature the feature we are validating + * @param metadataVersion the metadata version we have (or want to set) + * @param features the feature versions (besides MetadataVersion) we have (or want to set) + * @throws IllegalArgumentException if the feature is not valid + */ +public static void validateVersion(FeatureVersionUtils.FeatureVer
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1603398767 ## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ## @@ -459,18 +459,20 @@ BrokerFeature processRegistrationFeature( FinalizedControllerFeatures finalizedFeatures, BrokerRegistrationRequestData.Feature feature ) { -Optional finalized = finalizedFeatures.get(feature.name()); -if (finalized.isPresent()) { -if (!VersionRange.of(feature.minSupportedVersion(), feature.maxSupportedVersion()).contains(finalized.get())) { -throw new UnsupportedVersionException("Unable to register because the broker " + -"does not support version " + finalized.get() + " of " + feature.name() + -". It wants a version between " + feature.minSupportedVersion() + " and " + -feature.maxSupportedVersion() + ", inclusive."); -} -} else { -log.warn("Broker {} registered with feature {} that is unknown to the controller", +int defaultVersion = feature.name().equals(MetadataVersion.FEATURE_NAME) ? 1 : 0; // The default value for MetadataVersion is 1 not 0. +short finalized = finalizedFeatures.getOrDefault(feature.name(), (short) defaultVersion); +if (!VersionRange.of(feature.minSupportedVersion(), feature.maxSupportedVersion()).contains(finalized)) { +throw new UnsupportedVersionException("Unable to register because the broker " + +"does not support version " + finalized + " of " + feature.name() + +". It wants a version between " + feature.minSupportedVersion() + " and " + +feature.maxSupportedVersion() + ", inclusive."); +} +// A feature is not found in the finalizedFeature map if it is unknown to the controller or set to 0 (feature not enabled). +// As more features roll out, it may be common to leave a feature disabled, so this log is debug level in the case +// an intended feature is not being set. +if (finalized == 0) +log.debug("Broker {} registered with feature {} that is either unknown or version 0 on the controller", Review Comment: It's really hard to differentiate between disabled and unknown because the protocol when setting a feature to 0 is to remove it. You will not have a record when you set it to 0 because of this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1603396094 ## server-common/src/main/java/org/apache/kafka/server/common/FeatureVersionUtils.java: ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.common; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public interface FeatureVersionUtils { + +interface FeatureVersionImpl { +short featureLevel(); + +String featureName(); + +/** + * The next metadata version to be released when the feature became production ready. + * (Ie, if the current production MV is 17 when a feature is released, its mapping should be to MV 18) + */ +MetadataVersion metadataVersionMapping(); + +/** + * A mapping from feature to level for all features that this feature depends on. If this feature doesn't + * depend on any others, return an empty map. + * For example, say feature X level x relies on feature Y level y: + * feature (X level x).dependencies() will return (Y -> y) + */ +Map dependencies(); Review Comment: We could do that. I was wondering if this would be the format we typically see in the upgrade tool or if we would have to convert everything. I think in the records in the metadata we store it as Map for features. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1603394445 ## server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java: ## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Collections; +import java.util.Map; + +public enum TestFeatureVersion implements FeatureVersionUtils.FeatureVersionImpl { Review Comment: I was going to keep it for testing. We have a SimpleExampleMesssage and I think a few other placeholder classes so I thought that was in line with the project. I can consider removing it if we think we get sufficient coverage with other methods. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1603378178 ## server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java: ## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * This is enum for the various features implemented for Kafka clusters. + * KIP-584: Versioning Scheme for Features introduced the idea of various features, but only added one feature -- MetadataVersion. + * KIP-1022: Formatting and Updating Features allowed for more features to be added. In order to set and update features, + * they need to be specified via the StorageTool or FeatureCommand tools. + * + * Having a unified enum for the features that will use a shared type in the API used to set and update them + * makes it easier to process these features. + */ +public enum FeatureVersion { + +/** + * Features defined. If a feature is included in this list, and marked to be used in production they will also be specified when + * formatting a cluster via the StorageTool. MetadataVersion is handled separately, so it is not included here. + * + * See {@link TestFeatureVersion} as an example. See {@link FeatureVersionUtils.FeatureVersionImpl} when implementing a new feature. + */ +TEST_VERSION("test.feature.version", TestFeatureVersion.values(), TestFeatureVersion::fromFeatureLevel, false); + +public static final FeatureVersion[] FEATURES; +public static final List PRODUCTION_FEATURES; +private final String name; +private final FeatureVersionUtils.FeatureVersionImpl[] features; +private final FeatureVersionUtils.CreateMethod createFeatureVersionMethod; +private final boolean usedInProduction; + +FeatureVersion(String name, + FeatureVersionUtils.FeatureVersionImpl[] features, + FeatureVersionUtils.CreateMethod createMethod, + boolean usedInProduction) { +this.name = name; +this.features = features; +this.createFeatureVersionMethod = createMethod; +this.usedInProduction = usedInProduction; +} + +static { +FeatureVersion[] enumValues = FeatureVersion.values(); +FEATURES = Arrays.copyOf(enumValues, enumValues.length); + +PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature -> +feature.usedInProduction).collect(Collectors.toList()); +} + +public String featureName() { +return name; +} + +/** + * Creates a FeatureVersion from a given name and level with the correct feature object underneath. + * + * @param level the level of the feature + * @returns the FeatureVersionUtils.FeatureVersionImpl for the feature the enum is based on. + * @throwsIllegalArgumentException if the feature name is not valid (not implemented for this method) + */ +public FeatureVersionUtils.FeatureVersionImpl fromFeatureLevel(short level) { +return createFeatureVersionMethod.fromFeatureLevel(level); +} + +/** + * A method to validate the feature can be set. If a given feature relies on another feature, the dependencies should be + * captured in {@link FeatureVersionUtils.FeatureVersionImpl#dependencies()} + * + * For example, say feature X level x relies on feature Y level y: + * if feature X >= x then throw an error if feature Y < y. + * + * All feature levels above 0 require metadata.version=4 (IBP_3_3_IV0) in order to write the feature records to the cluster. + * + * @param feature the feature we are validating + * @param metadataVersion the metadata version we have (or want to set) + * @param features the feature versions (besides MetadataVersion) we have (or want to set) + * @throws IllegalArgumentException if the feature is not valid + */ +public static void validateVersion(FeatureVersionUtils.FeatureV
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1603376757 ## server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java: ## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * This is enum for the various features implemented for Kafka clusters. + * KIP-584: Versioning Scheme for Features introduced the idea of various features, but only added one feature -- MetadataVersion. + * KIP-1022: Formatting and Updating Features allowed for more features to be added. In order to set and update features, + * they need to be specified via the StorageTool or FeatureCommand tools. + * + * Having a unified enum for the features that will use a shared type in the API used to set and update them + * makes it easier to process these features. + */ +public enum FeatureVersion { + +/** + * Features defined. If a feature is included in this list, and marked to be used in production they will also be specified when + * formatting a cluster via the StorageTool. MetadataVersion is handled separately, so it is not included here. + * + * See {@link TestFeatureVersion} as an example. See {@link FeatureVersionUtils.FeatureVersionImpl} when implementing a new feature. + */ +TEST_VERSION("test.feature.version", TestFeatureVersion.values(), TestFeatureVersion::fromFeatureLevel, false); + +public static final FeatureVersion[] FEATURES; +public static final List PRODUCTION_FEATURES; +private final String name; +private final FeatureVersionUtils.FeatureVersionImpl[] features; +private final FeatureVersionUtils.CreateMethod createFeatureVersionMethod; +private final boolean usedInProduction; + +FeatureVersion(String name, + FeatureVersionUtils.FeatureVersionImpl[] features, + FeatureVersionUtils.CreateMethod createMethod, + boolean usedInProduction) { +this.name = name; +this.features = features; +this.createFeatureVersionMethod = createMethod; +this.usedInProduction = usedInProduction; +} + +static { +FeatureVersion[] enumValues = FeatureVersion.values(); +FEATURES = Arrays.copyOf(enumValues, enumValues.length); + +PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature -> +feature.usedInProduction).collect(Collectors.toList()); +} + +public String featureName() { +return name; +} + +/** + * Creates a FeatureVersion from a given name and level with the correct feature object underneath. + * + * @param level the level of the feature + * @returns the FeatureVersionUtils.FeatureVersionImpl for the feature the enum is based on. + * @throwsIllegalArgumentException if the feature name is not valid (not implemented for this method) + */ +public FeatureVersionUtils.FeatureVersionImpl fromFeatureLevel(short level) { +return createFeatureVersionMethod.fromFeatureLevel(level); +} + +/** + * A method to validate the feature can be set. If a given feature relies on another feature, the dependencies should be + * captured in {@link FeatureVersionUtils.FeatureVersionImpl#dependencies()} + * + * For example, say feature X level x relies on feature Y level y: + * if feature X >= x then throw an error if feature Y < y. + * + * All feature levels above 0 require metadata.version=4 (IBP_3_3_IV0) in order to write the feature records to the cluster. + * + * @param feature the feature we are validating + * @param metadataVersion the metadata version we have (or want to set) + * @param features the feature versions (besides MetadataVersion) we have (or want to set) + * @throws IllegalArgumentException if the feature is not valid + */ +public static void validateVersion(FeatureVersionUtils.FeatureV
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1603375064 ## server-common/src/main/java/org/apache/kafka/server/common/FeatureVersionUtils.java: ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.common; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public interface FeatureVersionUtils { + +interface FeatureVersionImpl { +short featureLevel(); + +String featureName(); + +/** + * The next metadata version to be released when the feature became production ready. + * (Ie, if the current production MV is 17 when a feature is released, its mapping should be to MV 18) + */ +MetadataVersion metadataVersionMapping(); Review Comment: No. This is not the minimum version. This is the first **image version** that supports the feature. I wanted to have a mapping to image version, but that was heavily argued against on the mailing thread so I have to use MV as a proxy. This value is only a DEFAULT if no value is set during bootstrapping. As long at the feature is in the image, you can set it regardless of the MV set (production ready or not.) Setting the version is completely independent from the MV set. ## server-common/src/main/java/org/apache/kafka/server/common/FeatureVersionUtils.java: ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.common; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public interface FeatureVersionUtils { + +interface FeatureVersionImpl { +short featureLevel(); + +String featureName(); + +/** + * The next metadata version to be released when the feature became production ready. + * (Ie, if the current production MV is 17 when a feature is released, its mapping should be to MV 18) + */ +MetadataVersion metadataVersionMapping(); Review Comment: No. This is not the minimum version. This is the first **image version** that supports the feature. I wanted to have a mapping to image version, but that was heavily argued against on the mailing thread so I have to use MV as a proxy. This value is only a DEFAULT if no value is set during bootstrapping but MV is set. As long at the feature is in the image, you can set it regardless of the MV set (production ready or not.) Setting the version is completely independent from the MV set. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1603375064 ## server-common/src/main/java/org/apache/kafka/server/common/FeatureVersionUtils.java: ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.common; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public interface FeatureVersionUtils { + +interface FeatureVersionImpl { +short featureLevel(); + +String featureName(); + +/** + * The next metadata version to be released when the feature became production ready. + * (Ie, if the current production MV is 17 when a feature is released, its mapping should be to MV 18) + */ +MetadataVersion metadataVersionMapping(); Review Comment: No. This is not the minimum version. This is the first **image version** that supports the feature. I wanted to have a mapping to image version, but that was heavily argued against on the mailing thread. This value is only a DEFAULT if no value is set during bootstrapping. As long at the feature is in the image, you can set it regardless of the MV set (production ready or not.) Setting the version is completely independent from the MV set. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1603368871 ## server-common/src/main/java/org/apache/kafka/server/common/FeatureVersionUtils.java: ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.common; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public interface FeatureVersionUtils { + +interface FeatureVersionImpl { Review Comment: Features is already a file in this directory :( but I will see what I can do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
dajac commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1602958576 ## server-common/src/main/java/org/apache/kafka/server/common/FeatureVersionUtils.java: ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.common; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public interface FeatureVersionUtils { Review Comment: Is there a particular reason for grouping those two interfaces in this one? I am asking because at first that `FeatureVersionUtils.java` was an utils library (e.g. useful functions). I did not get that it was actually the one that contains the interface that one must implement. ## server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java: ## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * This is enum for the various features implemented for Kafka clusters. + * KIP-584: Versioning Scheme for Features introduced the idea of various features, but only added one feature -- MetadataVersion. + * KIP-1022: Formatting and Updating Features allowed for more features to be added. In order to set and update features, + * they need to be specified via the StorageTool or FeatureCommand tools. + * + * Having a unified enum for the features that will use a shared type in the API used to set and update them + * makes it easier to process these features. + */ +public enum FeatureVersion { + +/** + * Features defined. If a feature is included in this list, and marked to be used in production they will also be specified when + * formatting a cluster via the StorageTool. MetadataVersion is handled separately, so it is not included here. + * + * See {@link TestFeatureVersion} as an example. See {@link FeatureVersionUtils.FeatureVersionImpl} when implementing a new feature. + */ +TEST_VERSION("test.feature.version", TestFeatureVersion.values(), TestFeatureVersion::fromFeatureLevel, false); + +public static final FeatureVersion[] FEATURES; +public static final List PRODUCTION_FEATURES; +private final String name; +private final FeatureVersionUtils.FeatureVersionImpl[] features; +private final FeatureVersionUtils.CreateMethod createFeatureVersionMethod; +private final boolean usedInProduction; + +FeatureVersion(String name, + FeatureVersionUtils.FeatureVersionImpl[] features, + FeatureVersionUtils.CreateMethod createMethod, + boolean usedInProduction) { +this.name = name; +this.features = features; +this.createFeatureVersionMethod = createMethod; +this.usedInProduction = usedInProduction; +} + +static { +FeatureVersion[] enumValues = FeatureVersion.values(); +FEATURES = Arrays.copyOf(enumValues, enumValues.length); + +PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature -> +feature.usedInProduction).collect(Collectors.toList()); +} + +public String featureName() { +return name; +} + +/** + * Creates a FeatureVersion from a given name and level with the
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1597305375 ## server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java: ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.List; + +public enum TestFeatureVersion implements FeatureVersion { + +TEST_0(0), +TEST_1(1), +TEST_2(2); + +private short featureLevel; + +public static final String FEATURE_NAME = "test.feature.version"; +public static final TestFeatureVersion PRODUCTION_VERSION = TEST_1; + +TestFeatureVersion(int featureLevel) { +this.featureLevel = (short) featureLevel; +} + +public short featureLevel() { +return featureLevel; +} + +public String featureName() { +return FEATURE_NAME; +} + +public void validateVersion(MetadataVersion metadataVersion, List features) { +// version 1 depends on metadata.version 3.3-IVO +if (featureLevel >= 1 && metadataVersion.isLessThan(MetadataVersion.IBP_3_3_IV0)) +throw new IllegalArgumentException(FEATURE_NAME + " could not be set to " + featureLevel + +" because it depends on metadata.version=14 (" + MetadataVersion.IBP_3_3_IV0 + ")"); +} + +public static TestFeatureVersion metadataVersionMapping(MetadataVersion metadataVersion) { +if (metadataVersion.isLessThan(MetadataVersion.IBP_3_8_IV0)) { Review Comment: This will be simplified when I fix the above. Potentially we can even include the 3.3 check in the FeatureVersion class since it will apply to all features and doesn't change on a per feature basis. Stay tuned for some cleanups. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1597304812 ## server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java: ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.List; + +public enum TestFeatureVersion implements FeatureVersion { + +TEST_0(0), +TEST_1(1), +TEST_2(2); + +private short featureLevel; + +public static final String FEATURE_NAME = "test.feature.version"; +public static final TestFeatureVersion PRODUCTION_VERSION = TEST_1; + +TestFeatureVersion(int featureLevel) { +this.featureLevel = (short) featureLevel; +} Review Comment: I have a plan for this but didn't quite get to it. Maybe we want to do this and maybe we don't. One idea is to have all features have a standard pattern/class for the fields. This would contain the metadata version mapping and a Map of the required other features. Not sure if we will need any more complicated validation logic than that. We could leave the opportunity to have a more complicated validate method if we choose. We can also show an example implementation in the TestFeatureVersion and folks can choose to replicate it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1597303887 ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -258,6 +259,13 @@ public enum MetadataVersion { this.didMetadataChange = didMetadataChange; } +public String featureName() { +return FEATURE_NAME; +} + +public void validateVersion(MetadataVersion metadataVersion, List features) { +} Review Comment: I removed this for now, we can add it back later if we want MetadataVersion to implement our interface. I think there are pros and cons for doing so. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1594889173 ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -258,6 +259,13 @@ public enum MetadataVersion { this.didMetadataChange = didMetadataChange; } +public String featureName() { +return FEATURE_NAME; +} + +public void validateVersion(MetadataVersion metadataVersion, List features) { +} Review Comment: Will do. I think the main thing was that that KafkaConfig can't be passed due to directory structure. The important config is unstable version enablement, so in a followup, I plan to include it in this method signature and include logic to support unstable versioning for all versions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1594886684 ## server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java: ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.List; + +public enum TestFeatureVersion implements FeatureVersion { + +TEST_0(0), +TEST_1(1), +TEST_2(2); + +private short featureLevel; + +public static final String FEATURE_NAME = "test.feature.version"; +public static final TestFeatureVersion PRODUCTION_VERSION = TEST_1; + +TestFeatureVersion(int featureLevel) { +this.featureLevel = (short) featureLevel; +} Review Comment: yes, assuming we specify metadata version as a numerical value and not a string as it is done in many places in the code. as per the kip, validation may involve other features besides metadata version, so we will need to consider how that will be implemented. (I didn't have any example of such interdependence, but it was requested that I support it) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1594885949 ## server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java: ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.List; + +public enum TestFeatureVersion implements FeatureVersion { + +TEST_0(0), +TEST_1(1), +TEST_2(2); + +private short featureLevel; + +public static final String FEATURE_NAME = "test.feature.version"; +public static final TestFeatureVersion PRODUCTION_VERSION = TEST_1; + +TestFeatureVersion(int featureLevel) { +this.featureLevel = (short) featureLevel; +} + +public short featureLevel() { +return featureLevel; +} + +public String featureName() { +return FEATURE_NAME; +} + +public void validateVersion(MetadataVersion metadataVersion, List features) { +// version 1 depends on metadata.version 3.3-IVO +if (featureLevel >= 1 && metadataVersion.isLessThan(MetadataVersion.IBP_3_3_IV0)) +throw new IllegalArgumentException(FEATURE_NAME + " could not be set to " + featureLevel + +" because it depends on metadata.version=14 (" + MetadataVersion.IBP_3_3_IV0 + ")"); +} + +public static TestFeatureVersion metadataVersionMapping(MetadataVersion metadataVersion) { +if (metadataVersion.isLessThan(MetadataVersion.IBP_3_8_IV0)) { Review Comment: It's a little confusing, but the idea is that as long as the version is less than 3.8, we use 0 So for mv 3.3 -> 3.8 we will use test_0 The thing about 3.3, is that we can't set any version (test, transactions, etc, if the MV is < 3.3 since that is the version that introduced the feature records) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1594884853 ## server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java: ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +/** + * This is an interface for the various features implemented for Kafka clusters. + * KIP-584: Versioning Scheme for Features introduced the idea of various features, but only added one feature -- MetadataVersion. + * KIP-1022: Formatting and Updating Features allowed for more features to be added. In order to set and update features, + * they need to be specified via the StorageTool or FeatureCommand tools. + * + * Having a unified interface for the features that will use a shared type in the API used to set and update them + * makes it easier to process these features. + */ +public interface FeatureVersion { + +/** + * Features currently used in production. If a feature is included in this list, it will also be specified when + * formatting a cluster via the StorageTool. MetadataVersion is handled separately, so it is not included here. + * + * When a feature is added here, make sure it has a mapping in {@link #defaultValue} and {@link #createFeature}. + * See {@link TestFeatureVersion} as an example. + */ +List PRODUCTION_FEATURES = Arrays.asList(); Review Comment: I can take a look at that approach. One thing that I tried to do is get the existing MetadataVersion to implement the interface. It is already defined as an enum though. Maybe though if we think it is not worthwhile to include MetadataVersion in this set, we can design something from the ground up that is less clunky. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
artemlivshits commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1591704034 ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -258,6 +259,13 @@ public enum MetadataVersion { this.didMetadataChange = didMetadataChange; } +public String featureName() { +return FEATURE_NAME; +} + +public void validateVersion(MetadataVersion metadataVersion, List features) { +} Review Comment: Could we add a comment on why validation is not needed? ## clients/src/main/java/org/apache/kafka/common/feature/BaseVersionRange.java: ## @@ -26,7 +26,7 @@ /** * Represents an immutable basic version range using 2 attributes: min and max, each of type short. * The min and max attributes need to satisfy 2 rules: - * - they are each expected to be >= 1, as we only consider positive version values to be valid. + * - they are each expected to be >= 0, as we only consider positive version values to be valid. Review Comment: Nit: to be precise we should change 'positive' -> 'non-negative' :-) ## server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java: ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.List; + +public enum TestFeatureVersion implements FeatureVersion { + +TEST_0(0), +TEST_1(1), +TEST_2(2); + +private short featureLevel; + +public static final String FEATURE_NAME = "test.feature.version"; +public static final TestFeatureVersion PRODUCTION_VERSION = TEST_1; + +TestFeatureVersion(int featureLevel) { +this.featureLevel = (short) featureLevel; +} Review Comment: I wonder if instead of doing manual implementation of validation and mapping, we could have minMetadataVersion as an enum argument, then validation would be just comparison, and for mapping we'd need to enumerate in reverse order and pick the first we find that supports the metadata version. ## server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java: ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +/** + * This is an interface for the various features implemented for Kafka clusters. + * KIP-584: Versioning Scheme for Features introduced the idea of various features, but only added one feature -- MetadataVersion. + * KIP-1022: Formatting and Updating Features allowed for more features to be added. In order to set and update features, + * they need to be specified via the StorageTool or FeatureCommand tools. + * + * Having a unified interface for the features that will use a shared type in the API used to set and update them + * makes it easier to process these features. + */ +public interface FeatureVersion { + +/** + * Features currently used in production. If a feature is included in this list, it will also be specified when + * formatting a cluster via the StorageTool. MetadataVersion is handled separately, so it is not included here. + * + * When a feature is added here, make sure it has a mapping in {@link #defaultValue} and {@link #createFeature}. + * See {@link TestFeatureVersion} as an example. + */ +
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on PR #15685: URL: https://github.com/apache/kafka/pull/15685#issuecomment-2076088235 Separately I need to fix the update tool. It will always say the finalized version is 0 if the tool knows of the feature, even if the broker doesn't include it in the list of finalized features. ``` Feature: metadata.versionSupportedMinVersion: 3.0-IV1 SupportedMaxVersion: 3.7-IV4FinalizedVersionLevel: 3.7-IV4 Epoch: 17 Feature: test.feature.versionSupportedMinVersion: 0 SupportedMaxVersion: 1 FinalizedVersionLevel: 0Epoch: 17 ``` ``` "supportedFeatures":[{"name":"metadata.version","minVersion":1,"maxVersion":19},{"name":"test.feature.version","minVersion":0,"maxVersion":1}],"finalizedFeaturesEpoch":17,"finalizedFeatures":[{"name":"metadata.version","maxVersionLevel":19,"minVersionLevel":19}]} ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on PR #15685: URL: https://github.com/apache/kafka/pull/15685#issuecomment-2076086916 I've cleaned up the code to not set the record in the storage tool when the version is 0. I also cleaned up the log since it is not always the case that the controller doesn't know the version. For now, 0 is a reasonable default. Here are the controller logs when i set the test version to 0 and to 1. ``` [2024-04-24 17:15:05,566] INFO [QuorumController id=1] Creating new QuorumController with clusterId F0eUpBlzS4yJYcsM9DNfXA. (org.apache.kafka.controller.QuorumController) [2024-04-24 17:15:05,568] INFO [QuorumController id=1] Becoming the active controller at epoch 1, next write offset 1. (org.apache.kafka.controller.QuorumController) [2024-04-24 17:15:05,570] WARN [QuorumController id=1] Performing controller activation. The metadata log appears to be empty. Appending 1 bootstrap record(s) in metadata transaction at metadata.version 3.7-IV4 from bootstrap source 'the binary bootstrap metadata file: /tmp/kraft-combined-logs/bootstrap.checkpoint'. Setting the ZK migration state to NONE since this is a de-novo KRaft cluster. (org.apache.kafka.controller.QuorumController) [2024-04-24 17:15:05,571] INFO [QuorumController id=1] Replayed BeginTransactionRecord(name='Bootstrap records') at offset 1. (org.apache.kafka.controller.OffsetControlManager) [2024-04-24 17:15:05,571] INFO [QuorumController id=1] Replayed a FeatureLevelRecord setting metadata.version to 3.7-IV4 (org.apache.kafka.controller.FeatureControlManager) [2024-04-24 17:15:05,571] INFO [QuorumController id=1] Replayed EndTransactionRecord() at offset 4. (org.apache.kafka.controller.OffsetControlManager) [2024-04-24 17:15:05,645] INFO [QuorumController id=1] Replayed RegisterControllerRecord contaning ControllerRegistration(id=1, incarnationId=h3WYlEEtTUCG6nOjFnIQxQ, zkMigrationReady=false, listeners=[Endpoint(listenerName='CONTROLLER', securityProtocol=PLAINTEXT, host='10.200.4.27', port=9093)], supportedFeatures={metadata.version: 1-19, test.feature.version: 0-1}). (org.apache.kafka.controller.ClusterControlManager) [2024-04-24 17:15:05,686] INFO [QuorumController id=1] Replayed initial RegisterBrokerRecord for broker 1: RegisterBrokerRecord(brokerId=1, isMigratingZkBroker=false, incarnationId=Ivj6roa7QnmcRbx_P_hg0A, brokerEpoch=6, endPoints=[BrokerEndpoint(name='PLAINTEXT', host='localhost', port=9092, securityProtocol=0)], features=[BrokerFeature(name='metadata.version', minSupportedVersion=1, maxSupportedVersion=19), BrokerFeature(name='test.feature.version', minSupportedVersion=0, maxSupportedVersion=1)], rack=null, fenced=true, inControlledShutdown=false, logDirs=[j6NFYHN2xQ8wG1rUVXf7LA]) (org.apache.kafka.controller.ClusterControlManager) [2024-04-24 17:15:05,745] INFO [QuorumController id=1] Replayed RegisterBrokerRecord modifying the registration for broker 1: RegisterBrokerRecord(brokerId=1, isMigratingZkBroker=false, incarnationId=Ivj6roa7QnmcRbx_P_hg0A, brokerEpoch=7, endPoints=[BrokerEndpoint(name='PLAINTEXT', host='localhost', port=9092, securityProtocol=0)], features=[BrokerFeature(name='metadata.version', minSupportedVersion=1, maxSupportedVersion=19), BrokerFeature(name='test.feature.version', minSupportedVersion=0, maxSupportedVersion=1)], rack=null, fenced=true, inControlledShutdown=false, logDirs=[j6NFYHN2xQ8wG1rUVXf7LA]) (org.apache.kafka.controller.ClusterControlManager) [2024-04-24 17:15:05,786] INFO [QuorumController id=1] The request from broker 1 to unfence has been granted because it has caught up with the offset of its register broker record 7. (org.apache.kafka.controller.BrokerHeartbeatManager) [2024-04-24 17:15:05,788] INFO [QuorumController id=1] Replayed BrokerRegistrationChangeRecord modifying the registration for broker 1: BrokerRegistrationChangeRecord(brokerId=1, brokerEpoch=7, fenced=-1, inControlledShutdown=0, logDirs=[]) (org.apache.kafka.controller.ClusterControlManager) ``` ``` 24-04-24 17:16:29,048] INFO [QuorumController id=1] Creating new QuorumController with clusterId F0eUpBlzS4yJYcsM9DNfXA. (org.apache.kafka.controller.QuorumController) [2024-04-24 17:16:29,050] INFO [QuorumController id=1] Becoming the active controller at epoch 1, next write offset 1. (org.apache.kafka.controller.QuorumController) [2024-04-24 17:16:29,051] WARN [QuorumController id=1] Performing controller activation. The metadata log appears to be empty. Appending 2 bootstrap record(s) in metadata transaction at metadata.version 3.7-IV4 from bootstrap source 'the binary bootstrap metadata file: /tmp/kraft-combined-logs/bootstrap.checkpoint'. Setting the ZK migration state to NONE since this is a de-novo KRaft cluster. (org.apache.kafka.controller.QuorumController) [2024-04-24 17:16:29,052] INFO [QuorumController id=1] Replayed BeginTransactionRecord(name='Bootstrap records') at offset 1. (org.apache.
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on PR #15685: URL: https://github.com/apache/kafka/pull/15685#issuecomment-2065435357 I may need to figure out how to deal with this...when setting the version to 0. ``` Replayed a FeatureLevelRecord removing feature test.feature.version ``` ``` Broker 1 registered with feature test.feature.version that is unknown to the controller (org.apache.kafka.controller.ClusterControlManager) [2024-04-18 15:32:25,440] INFO [QuorumController id=1] Replayed RegisterBrokerRecord modifying the registration for broker 1: RegisterBrokerRecord(brokerId=1, isMigratingZkBroker=false, incarnationId=pihYtx_qSSKlJ2K31eGFOw, brokerEpoch=8, endPoints=[BrokerEndpoint(name='PLAINTEXT', host='localhost', port=9092, securityProtocol=0)], features=[BrokerFeature(name='metadata.version', minSupportedVersion=1, maxSupportedVersion=19), BrokerFeature(name='test.feature.version', minSupportedVersion=0, maxSupportedVersion=1)], rack=null, fenced=true, inControlledShutdown=false, logDirs=[T1fHZ5DjOC1Dk5CviOHPbg]) (org.apache.kafka.controller.ClusterControlManager) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on PR #15685: URL: https://github.com/apache/kafka/pull/15685#issuecomment-2062753494 I noticed I need to do Quorum and Broker features which are basically the same implementation. Stay tuned. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
splett2 commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1568050802 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -108,6 +103,43 @@ object StorageTool extends Logging { } } + def metadataVersionValidation(metadataVersion: MetadataVersion, config: Option[KafkaConfig]): Unit = { Review Comment: nit: `validateMetadataVersion` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org