kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r499810462
########## File path: core/src/main/scala/kafka/controller/KafkaController.scala ########## @@ -272,6 +281,161 @@ class KafkaController(val config: KafkaConfig, } } + private def createFeatureZNode(newNode: FeatureZNode): Int = { + info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: $newNode") + zkClient.createFeatureZNode(newNode) + val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path) + newVersion + } + + private def updateFeatureZNode(updatedNode: FeatureZNode): Int = { + info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: $updatedNode") + zkClient.updateFeatureZNode(updatedNode) + } + + /** + * This method enables the feature versioning system (KIP-584). + * + * Development in Kafka (from a high level) is organized into features. Each feature is tracked by + * a name and a range of version numbers. A feature can be of two types: + * + * 1. Supported feature: + * A supported feature is represented by a name (string) and a range of versions (defined by a + * SupportedVersionRange). It refers to a feature that a particular broker advertises support for. + * Each broker advertises the version ranges of its own supported features in its own + * BrokerIdZNode. The contents of the advertisement are specific to the particular broker and + * do not represent any guarantee of a cluster-wide availability of the feature for any particular + * range of versions. + * + * 2. Finalized feature: + * A finalized feature is represented by a name (string) and a range of version levels (defined + * by a FinalizedVersionRange). Whenever the feature versioning system (KIP-584) is + * enabled, the finalized features are stored in the cluster-wide common FeatureZNode. + * In comparison to a supported feature, the key difference is that a finalized feature exists + * in ZK only when it is guaranteed to be supported by any random broker in the cluster for a + * specified range of version levels. Also, the controller is the only entity modifying the + * information about finalized features. + * + * This method sets up the FeatureZNode with enabled status, which means that the finalized + * features stored in the FeatureZNode are active. The enabled status should be written by the + * controller to the FeatureZNode only when the broker IBP config is greater than or equal to + * KAFKA_2_7_IV0. + * + * There are multiple cases handled here: + * + * 1. New cluster bootstrap: + * A new Kafka cluster (i.e. it is deployed first time) is almost always started with IBP config + * setting greater than or equal to KAFKA_2_7_IV0. We would like to start the cluster with all + * the possible supported features finalized immediately. Assuming this is the case, the + * controller will start up and notice that the FeatureZNode is absent in the new cluster, + * it will then create a FeatureZNode (with enabled status) containing the entire list of + * supported features as its finalized features. + * + * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0: + * Imagine there was an existing Kafka cluster with IBP config less than KAFKA_2_7_IV0, and the + * broker binary has now been upgraded to a newer version that supports the feature versioning + * system (KIP-584). But the IBP config is still set to lower than KAFKA_2_7_IV0, and may be + * set to a higher value later. In this case, we want to start with no finalized features and + * allow the user to finalize them whenever they are ready i.e. in the future whenever the + * user sets IBP config to be greater than or equal to KAFKA_2_7_IV0, then the user could start + * finalizing the features. This process ensures we do not enable all the possible features + * immediately after an upgrade, which could be harmful to Kafka. + * This is how we handle such a case: + * - Before the IBP config upgrade (i.e. IBP config set to less than KAFKA_2_7_IV0), the + * controller will start up and check if the FeatureZNode is absent. + * - If the node is absent, it will react by creating a FeatureZNode with disabled status + * and empty finalized features. + * - Otherwise, if a node already exists in enabled status then the controller will just + * flip the status to disabled and clear the finalized features. + * - After the IBP config upgrade (i.e. IBP config set to greater than or equal to + * KAFKA_2_7_IV0), when the controller starts up it will check if the FeatureZNode exists + * and whether it is disabled. + * - If the node is in disabled status, the controller won’t upgrade all features immediately. + * Instead it will just switch the FeatureZNode status to enabled status. This lets the + * user finalize the features later. + * - Otherwise, if a node already exists in enabled status then the controller will leave + * the node umodified. + * + * 3. Broker binary upgraded, with existing cluster IBP config >= KAFKA_2_7_IV0: + * Imagine there was an existing Kafka cluster with IBP config >= KAFKA_2_7_IV0, and the broker + * binary has just been upgraded to a newer version (that supports IBP config KAFKA_2_7_IV0 and + * higher). The controller will start up and find that a FeatureZNode is already present with + * enabled status and existing finalized features. In such a case, the controller leaves the node + * unmodified. + * + * 4. Broker downgrade: + * Imagine that a Kafka cluster exists already and the IBP config is greater than or equal to + * KAFKA_2_7_IV0. Then, the user decided to downgrade the cluster by setting IBP config to a + * value less than KAFKA_2_7_IV0. This means the user is also disabling the feature versioning + * system (KIP-584). In this case, when the controller starts up with the lower IBP config, it + * will switch the FeatureZNode status to disabled with empty features. + */ + private def enableFeatureVersioning(): Unit = { + val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(FeatureZNode.path) + if (version == ZkVersion.UnknownVersion) { + val newVersion = createFeatureZNode(new FeatureZNode(FeatureZNodeStatus.Enabled, + brokerFeatures.defaultFinalizedFeatures)) + featureCache.waitUntilEpochOrThrow(newVersion, config.zkConnectionTimeoutMs) + } else { + val existingFeatureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get) + val newFeatures = existingFeatureZNode.status match { + case FeatureZNodeStatus.Enabled => existingFeatureZNode.features + case FeatureZNodeStatus.Disabled => + if (!existingFeatureZNode.features.empty()) { + warn(s"FeatureZNode at path: ${FeatureZNode.path} with disabled status" + + " contains non-empty features.") + } + Features.emptyFinalizedFeatures + } + val newFeatureZNode = new FeatureZNode(FeatureZNodeStatus.Enabled, newFeatures) Review comment: Done. I have improved it now introducing a type definition called `FeatureZNodeStatus` that points to `Value`. IIUC you were referring to this LOC, correct? https://github.com/apache/kafka/blob/4f96c5b424956355339dd3216c426c1c0388fe9e/core/src/main/scala/kafka/zk/ZkData.scala#L851 Here the enum: `FeatureZNodeStatus` is defined and used in the same file. I thought I'd add an `import` to fix it like the below, but it was a little unusual to add an `import` statement right above the class definition: ``` import FeatureZNodeStatus._ case class FeatureZNode(status: FeatureZNodeStatus, features: Features[FinalizedVersionRange]) { } ``` With my recent change, in the future it should be possible to `import FeatureZNodeStatus._` within other files when referring to the enum value. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org