junrao commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r467279790



##########
File path: clients/src/main/resources/common/message/UpdateFeaturesRequest.json
##########
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "request",
+  "name": "UpdateFeaturesRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    { "name": "FeatureUpdates", "type": "[]FeatureUpdateKey", "versions": "0+",
+      "about": "The list of updates to finalized features.", "fields": [
+      {"name": "Feature", "type": "string", "versions": "0+", "mapKey": true,
+        "about": "The name of the finalized feature to be updated."},
+      {"name": "MaxVersionLevel", "type": "int16", "versions": "0+",
+        "about": "The new maximum version level for the finalized feature. A 
value >= 1 is valid. A value < 1, is special, and can be used to request the 
deletion of the finalized feature."},
+      {"name": "AllowDowngrade", "type": "bool", "versions": "0+",

Review comment:
       The KIP wiki has AllowDowngrade at the topic level. Could we update that?

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -266,6 +275,199 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+    info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+    zkClient.createFeatureZNode(newNode)
+    val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+    newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+    info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+    zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (String) and a range of 
versions (defined by a
+   * {@link SupportedVersionRange}). It refers to a feature that a particular 
broker advertises
+   * support for. Each broker advertises the version ranges of its own 
supported features in its
+   * own BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (String) and a range of 
version levels (defined
+   * by a {@link FinalizedVersionRange}). Whenever the feature versioning 
system (KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *    A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *    setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *    the possible supported features finalized immediately. Assuming this 
is the case, the
+   *    controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *    it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *    default supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *    Imagine there is an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *    broker binary has been upgraded to a newer version that supports the 
feature versioning
+   *    system (KIP-584). This means the user is upgrading from an earlier 
version of the broker
+   *    binary. In this case, we want to start with no finalized features and 
allow the user to
+   *    finalize them whenever they are ready i.e. in the future whenever the 
user sets IBP config
+   *    to be greater than or equal to KAFKA_2_7_IV0, then the user could 
start finalizing the
+   *    features. This process ensures we do not enable all the possible 
features immediately after
+   *    an upgrade, which could be harmful to Kafka.
+   *    This is how we handle such a case:
+   *      - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *        controller will start up and check if the FeatureZNode is absent. 
If absent, it will
+   *        react by creating a FeatureZNode with disabled status and empty 
finalized features.
+   *        Otherwise, if a node already exists in enabled status then the 
controller will just
+   *        flip the status to disabled and clear the finalized features.
+   *      - After the IBP config upgrade (i.e. IBP config set to greater than 
or equal to
+   *        KAFKA_2_7_IV0), when the controller starts up it will check if the 
FeatureZNode exists
+   *        and whether it is disabled. In such a case, it won’t upgrade all 
features immediately.
+   *        Instead it will just switch the FeatureZNode status to enabled 
status. This lets the
+   *        user finalize the features later.
+   *
+   * 3. Broker binary upgraded, with existing cluster IBP config >= 
KAFKA_2_7_IV0:
+   *    Imagine an existing Kafka cluster with IBP config >= KAFKA_2_7_IV0, 
and the broker binary
+   *    has just been upgraded to a newer version (that supports IBP config 
KAFKA_2_7_IV0 and higher).
+   *    The controller will start up and find that a FeatureZNode is already 
present with enabled
+   *    status and existing finalized features. In such a case, the controller 
needs to scan the
+   *    existing finalized features and mutate them for the purpose of version 
level deprecation
+   *    (if needed).
+   *    This is how we handle this case: If an existing finalized feature is 
present in the default
+   *    finalized features, then, its existing minimum version level is 
updated to the default
+   *    minimum version level maintained in the BrokerFeatures object. The 
goal of this mutation is
+   *    to permanently deprecate one or more feature version levels. The range 
of feature version
+   *    levels deprecated are from the closed range: 
[existing_min_version_level, default_min_version_level].
+   *    NOTE: Deprecating a feature version level is an incompatible change, 
which requires a major
+   *    release of Kafka. In such a release, the minimum version level 
maintained within the
+   *    BrokerFeatures class is updated suitably to record the deprecation of 
the feature.
+   *
+   * 4. Broker downgrade:
+   *    Imagine that a Kafka cluster exists already and the IBP config is 
greater than or equal to
+   *    KAFKA_2_7_IV0. Then, the user decided to downgrade the cluster by 
setting IBP config to a
+   *    value less than KAFKA_2_7_IV0. This means the user is also disabling 
the feature versioning
+   *    system (KIP-584). In this case, when the controller starts up with the 
lower IBP config, it
+   *    will switch the FeatureZNode status to disabled with empty features.
+   */
+  private def enableFeatureVersioning(): Unit = {
+    val defaultFinalizedFeatures = brokerFeatures.getDefaultFinalizedFeatures
+    val (mayBeFeatureZNodeBytes, version) = 
zkClient.getDataAndVersion(FeatureZNode.path)
+    if (version == ZkVersion.UnknownVersion) {
+      val newVersion = createFeatureZNode(new 
FeatureZNode(FeatureZNodeStatus.Enabled, defaultFinalizedFeatures))
+      featureCache.waitUntilEpochOrThrow(newVersion, 
config.zkConnectionTimeoutMs)
+    } else {
+      val existingFeatureZNode = 
FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+      var newFeatures: Features[FinalizedVersionRange] = 
Features.emptyFinalizedFeatures()
+      if (existingFeatureZNode.status.equals(FeatureZNodeStatus.Enabled)) {
+        newFeatures = 
Features.finalizedFeatures(existingFeatureZNode.features.features().asScala.map 
{
+          case (featureName, existingVersionRange) =>
+            val brokerDefaultVersionRange = 
defaultFinalizedFeatures.get(featureName)
+            if (brokerDefaultVersionRange == null) {
+              warn(s"Existing finalized feature: $featureName with 
$existingVersionRange"
+                + s" is absent in default finalized $defaultFinalizedFeatures")
+              (featureName, existingVersionRange)
+            } else if (brokerDefaultVersionRange.max() >= 
existingVersionRange.max() &&
+                       brokerDefaultVersionRange.min() <= 
existingVersionRange.max()) {
+              // Using the change below, we deprecate all version levels in 
the range:
+              // [existingVersionRange.min(), brokerDefaultVersionRange.min() 
- 1].
+              //
+              // NOTE: if existingVersionRange.min() equals 
brokerDefaultVersionRange.min(), then
+              // we do not deprecate any version levels (since there is none 
to be deprecated).
+              //
+              // Examples:
+              // 1. brokerDefaultVersionRange = [4, 7] and 
existingVersionRange = [1, 5].
+              //    In this case, we deprecate all version levels in the 
range: [1, 3].
+              // 2. brokerDefaultVersionRange = [4, 7] and 
existingVersionRange = [4, 5].
+              //    In this case, we do not deprecate any version levels since
+              //    brokerDefaultVersionRange.min() equals 
existingVersionRange.min().
+              (featureName, new 
FinalizedVersionRange(brokerDefaultVersionRange.min(), 
existingVersionRange.max()))

Review comment:
       When we roll the cluster to bump up IBP, it seems that it's possible for 
the min of finalized version to flip repeatedly? This can be a bit weird.
   
   Also, it seems that we should set min version based on the largest min 
version across all brokers?

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##########
@@ -82,18 +110,54 @@ object FinalizedFeatureCache extends Logging {
         " The existing cache contents are %s").format(latest, 
oldFeatureAndEpoch)
       throw new FeatureCacheUpdateException(errorMsg)
     } else {
-      val incompatibleFeatures = 
SupportedFeatures.incompatibleFeatures(latest.features)
+      val incompatibleFeatures = 
brokerFeatures.incompatibleFeatures(latest.features)
       if (!incompatibleFeatures.empty) {
         val errorMsg = ("FinalizedFeatureCache update failed since feature 
compatibility" +
           " checks failed! Supported %s has incompatibilities with the latest 
%s."
-          ).format(SupportedFeatures.get, latest)
+          ).format(brokerFeatures.supportedFeatures, latest)

Review comment:
       If the broker discovers that it's incompatible, should it just shut 
itself down?

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -266,6 +275,199 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+    info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+    zkClient.createFeatureZNode(newNode)
+    val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+    newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+    info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+    zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (String) and a range of 
versions (defined by a
+   * {@link SupportedVersionRange}). It refers to a feature that a particular 
broker advertises
+   * support for. Each broker advertises the version ranges of its own 
supported features in its
+   * own BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (String) and a range of 
version levels (defined
+   * by a {@link FinalizedVersionRange}). Whenever the feature versioning 
system (KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.

Review comment:
       When we roll the cluster to bump up IBP, it seems that it's possible for 
status to be enabled and then disabled repeatedly? This can be a bit weird.

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -977,14 +1179,30 @@ class KafkaController(val config: KafkaConfig,
 
   /**
    * Send the leader information for selected partitions to selected brokers 
so that they can correctly respond to
-   * metadata requests
+   * metadata requests. Particularly, when feature versioning is enabled, we 
filter out brokers with incompatible
+   * features from receiving the metadata requests. This is because we do not 
want to activate incompatible brokers,
+   * as these may have harmful consequences to the cluster.

Review comment:
       Hmm, do we need to do this? If there is an incompatible feature, the 
broker will realize that and can just shut itself down.

##########
File path: clients/src/main/resources/common/message/UpdateFeaturesResponse.json
##########
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "response",
+  "name": "UpdateFeaturesResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    { "name": "Results", "type": "[]UpdatableFeatureResult", "versions": "0+",
+      "about": "Results for each feature update.", "fields": [
+      {"name": "Feature", "type": "string", "versions": "0+", "mapKey": true,

Review comment:
       The KIP wiki doesn't include this field.

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1647,6 +1865,192 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns an ApiError object containing Errors#INVALID_REQUEST.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return         the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+    if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+      throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+    }
+
+    val incompatibilityError = "Could not apply finalized feature update 
because" +
+      " brokers were found to have incompatible versions for the feature."
+
+    if (brokerFeatures.supportedFeatures.get(update.feature()) == null) {
+      Right(new ApiError(Errors.INVALID_REQUEST, incompatibilityError))
+    } else {
+      val defaultMinVersionLevel = 
brokerFeatures.defaultMinVersionLevel(update.feature)
+      val newVersionRange = new FinalizedVersionRange(defaultMinVersionLevel, 
update.maxVersionLevel)

Review comment:
       If update.maxVersionLevel < defaultMinVersionLevel, we throw an 
IllegalStateException. Should we catch it and convert it to an error code?

##########
File path: core/src/main/scala/kafka/server/BrokerFeatures.scala
##########
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, 
SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A class that encapsulates the following:
+ *
+ * 1. The latest features supported by the Broker.
+ *
+ * 2. The default minimum version levels for specific features. This map 
enables feature

Review comment:
       Could you explain how the default min version is different from the min 
in supportedFeatures?

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1647,6 +1865,192 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns an ApiError object containing Errors#INVALID_REQUEST.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return         the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+    if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+      throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+    }
+
+    val incompatibilityError = "Could not apply finalized feature update 
because" +
+      " brokers were found to have incompatible versions for the feature."
+
+    if (brokerFeatures.supportedFeatures.get(update.feature()) == null) {

Review comment:
       Since we are doing the compatibility check for every broker, do we need 
to special case here just for the broker feature on the controller?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/FeatureMetadata.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.feature.Features;
+import org.apache.kafka.common.feature.FinalizedVersionRange;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+
+/**
+ * Encapsulates details about finalized as well as supported features. This is 
particularly useful
+ * to hold the result returned by the {@link 
Admin#describeFeatures(DescribeFeaturesOptions)} API.
+ */
+public class FeatureMetadata {
+
+    private final Features<FinalizedVersionRange> finalizedFeatures;
+
+    private final Optional<Integer> finalizedFeaturesEpoch;
+
+    private final Features<SupportedVersionRange> supportedFeatures;
+
+    public FeatureMetadata(final Features<FinalizedVersionRange> 
finalizedFeatures,
+                           final int finalizedFeaturesEpoch,
+                           final Features<SupportedVersionRange> 
supportedFeatures) {
+        Objects.requireNonNull(finalizedFeatures, "Provided finalizedFeatures 
can not be null.");
+        Objects.requireNonNull(supportedFeatures, "Provided supportedFeatures 
can not be null.");
+        this.finalizedFeatures = finalizedFeatures;
+        if (finalizedFeaturesEpoch >= 0) {
+            this.finalizedFeaturesEpoch = Optional.of(finalizedFeaturesEpoch);
+        } else {
+            this.finalizedFeaturesEpoch = Optional.empty();
+        }
+        this.supportedFeatures = supportedFeatures;
+    }
+
+    /**
+     * A map of finalized feature versions, with key being finalized feature 
name and value
+     * containing the min/max version levels for the finalized feature.
+     */
+    public Features<FinalizedVersionRange> finalizedFeatures() {

Review comment:
       The return type is different from the KIP. Which one is correct? Since 
this is a public interface, in general, we don't want to expose anything other 
than truly necessary. This PR seems to expose a lot more public methods to the 
user.
   
   FinalizedVersionRange is in org.apache.kafka.common.feature. Currently, all 
public interfaces are specified under javadoc in build.gradle. So, we need to 
either include that package in javadoc or move it to a public package.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/FeatureMetadata.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.feature.Features;
+import org.apache.kafka.common.feature.FinalizedVersionRange;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+
+/**
+ * Encapsulates details about finalized as well as supported features. This is 
particularly useful
+ * to hold the result returned by the {@link 
Admin#describeFeatures(DescribeFeaturesOptions)} API.
+ */
+public class FeatureMetadata {
+
+    private final Features<FinalizedVersionRange> finalizedFeatures;
+
+    private final Optional<Integer> finalizedFeaturesEpoch;
+
+    private final Features<SupportedVersionRange> supportedFeatures;
+
+    public FeatureMetadata(final Features<FinalizedVersionRange> 
finalizedFeatures,
+                           final int finalizedFeaturesEpoch,
+                           final Features<SupportedVersionRange> 
supportedFeatures) {
+        Objects.requireNonNull(finalizedFeatures, "Provided finalizedFeatures 
can not be null.");
+        Objects.requireNonNull(supportedFeatures, "Provided supportedFeatures 
can not be null.");
+        this.finalizedFeatures = finalizedFeatures;
+        if (finalizedFeaturesEpoch >= 0) {
+            this.finalizedFeaturesEpoch = Optional.of(finalizedFeaturesEpoch);
+        } else {
+            this.finalizedFeaturesEpoch = Optional.empty();
+        }
+        this.supportedFeatures = supportedFeatures;
+    }
+
+    /**
+     * A map of finalized feature versions, with key being finalized feature 
name and value
+     * containing the min/max version levels for the finalized feature.
+     */
+    public Features<FinalizedVersionRange> finalizedFeatures() {
+        return finalizedFeatures;
+    }
+
+    /**
+     * The epoch for the finalized features.
+     * If the returned value is empty, it means the finalized features are 
absent/unavailable.
+     */
+    public Optional<Integer> finalizedFeaturesEpoch() {

Review comment:
       The return type is different from the KIP. Which one is correct?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/FeatureMetadata.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.feature.Features;
+import org.apache.kafka.common.feature.FinalizedVersionRange;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+
+/**
+ * Encapsulates details about finalized as well as supported features. This is 
particularly useful
+ * to hold the result returned by the {@link 
Admin#describeFeatures(DescribeFeaturesOptions)} API.

Review comment:
       The KIP also exposes host() and port(). Are they still needed?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##########
@@ -1214,6 +1216,71 @@ default AlterClientQuotasResult 
alterClientQuotas(Collection<ClientQuotaAlterati
      */
     AlterClientQuotasResult 
alterClientQuotas(Collection<ClientQuotaAlteration> entries, 
AlterClientQuotasOptions options);
 
+    /**
+     * Describes finalized as well as supported features. By default, the 
request is issued to any
+     * broker. It can be optionally directed only to the controller via 
DescribeFeaturesOptions
+     * parameter. This is particularly useful if the user requires strongly 
consistent reads of
+     * finalized features.
+     * <p>
+     * The following exceptions can be anticipated when calling {@code get()} 
on the future from the
+     * returned {@link DescribeFeaturesResult}:
+     * <ul>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   If the request timed out before the describe operation could 
finish.</li>
+     * </ul>
+     * <p>
+     * @param options   the options to use
+     *
+     * @return          the {@link DescribeFeaturesResult} containing the 
result
+     */
+    DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);
+
+    /**
+     * Applies specified updates to finalized features. This operation is not 
transactional so it
+     * may succeed for some features while fail for others.
+     * <p>
+     * The API takes in a map of finalized feature names to {@link 
FeatureUpdate} that needs to be
+     * applied. Each entry in the map specifies the finalized feature to be 
added or updated or
+     * deleted, along with the new max feature version level value. This 
request is issued only to
+     * the controller since the API is only served by the controller. The 
return value contains an
+     * error code for each supplied {@link FeatureUpdate}, and the code 
indicates if the update
+     * succeeded or failed in the controller.
+     * <ul>
+     * <li>Downgrade of feature version level is not a regular 
operation/intent. It is only allowed
+     * in the controller if the {@link FeatureUpdate} has the allowDowngrade 
flag set - setting this
+     * flag conveys user intent to attempt downgrade of a feature max version 
level. Note that
+     * despite the allowDowngrade flag being set, certain downgrades may be 
rejected by the
+     * controller if it is deemed impossible.</li>
+     * <li>Deletion of a finalized feature version is not a regular 
operation/intent. It could be
+     * done by setting the allowDowngrade flag to true in the {@link 
FeatureUpdate}, and, setting
+     * the max version level to be less than 1.</li>
+     * </ul>
+     *<p>
+     * The following exceptions can be anticipated when calling {@code get()} 
on the futures
+     * obtained from the returned {@link UpdateFeaturesResult}:
+     * <ul>
+     *   <li>{@link 
org.apache.kafka.common.errors.ClusterAuthorizationException}
+     *   If the authenticated user didn't have alter access to the 
cluster.</li>
+     *   <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
+     *   If the request details are invalid. e.g., a non-existing finalized 
feature is attempted
+     *   to be deleted or downgraded.</li>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   If the request timed out before the updates could finish. It cannot 
be guaranteed whether
+     *   the updates succeeded or not.</li>
+     *   <li>{@link FeatureUpdateFailedException}
+     *   If the updates could not be applied on the controller, despite the 
request being valid.
+     *   This may be a temporary problem.</li>
+     * </ul>
+     * <p>
+     * This operation is supported by brokers with version 2.7.0 or higher.
+
+     * @param featureUpdates   the map of finalized feature name to {@link 
FeatureUpdate}
+     * @param options          the options to use
+     *
+     * @return                 the {@link UpdateFeaturesResult} containing the 
result
+     */
+    UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate> 
featureUpdates, UpdateFeaturesOptions options);

Review comment:
       Again, this method has a different signature from the KIP.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##########
@@ -1214,6 +1216,71 @@ default AlterClientQuotasResult 
alterClientQuotas(Collection<ClientQuotaAlterati
      */
     AlterClientQuotasResult 
alterClientQuotas(Collection<ClientQuotaAlteration> entries, 
AlterClientQuotasOptions options);
 
+    /**
+     * Describes finalized as well as supported features. By default, the 
request is issued to any
+     * broker. It can be optionally directed only to the controller via 
DescribeFeaturesOptions
+     * parameter. This is particularly useful if the user requires strongly 
consistent reads of
+     * finalized features.
+     * <p>
+     * The following exceptions can be anticipated when calling {@code get()} 
on the future from the
+     * returned {@link DescribeFeaturesResult}:
+     * <ul>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   If the request timed out before the describe operation could 
finish.</li>
+     * </ul>
+     * <p>
+     * @param options   the options to use
+     *
+     * @return          the {@link DescribeFeaturesResult} containing the 
result
+     */
+    DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);

Review comment:
       The KIP doesn't have DescribeFeaturesOptions. If we are changing the 
KIP, could we summarize the list of the things that are changed?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4071,6 +4078,113 @@ void handleFailure(Throwable throwable) {
         return new 
AlterClientQuotasResult(Collections.unmodifiableMap(futures));
     }
 
+    @Override
+    public DescribeFeaturesResult describeFeatures(final 
DescribeFeaturesOptions options) {
+        final KafkaFutureImpl<FeatureMetadata> future = new 
KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final NodeProvider provider =
+            options.sendRequestToController() ? new ControllerNodeProvider() : 
new LeastLoadedNodeProvider();
+
+        Call call = new Call(
+            "describeFeatures", calcDeadlineMs(now, options.timeoutMs()), 
provider) {
+
+            @Override
+            ApiVersionsRequest.Builder createRequest(int timeoutMs) {
+                return new ApiVersionsRequest.Builder();
+            }
+
+            @Override
+            void handleResponse(AbstractResponse response) {
+                final ApiVersionsResponse apiVersionsResponse = 
(ApiVersionsResponse) response;
+                if (apiVersionsResponse.data.errorCode() == 
Errors.NONE.code()) {
+                    future.complete(
+                        new FeatureMetadata(
+                            apiVersionsResponse.finalizedFeatures(),
+                            apiVersionsResponse.finalizedFeaturesEpoch(),
+                            apiVersionsResponse.supportedFeatures()));
+                } else if (options.sendRequestToController() && 
apiVersionsResponse.data.errorCode() == Errors.NOT_CONTROLLER.code()) {
+                    handleNotControllerError(Errors.NOT_CONTROLLER);
+                } else {
+                    future.completeExceptionally(
+                        
Errors.forCode(apiVersionsResponse.data.errorCode()).exception());
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                completeAllExceptionally(Collections.singletonList(future), 
throwable);
+            }
+        };
+
+        runnable.call(call, now);
+        return new DescribeFeaturesResult(future);
+    }
+
+    @Override
+    public UpdateFeaturesResult updateFeatures(
+        final Map<String, FeatureUpdate> featureUpdates, final 
UpdateFeaturesOptions options) {
+        if (featureUpdates == null || featureUpdates.isEmpty()) {
+            throw new IllegalArgumentException("Feature updates can not be 
null or empty.");
+        }
+        Objects.requireNonNull(options, "UpdateFeaturesOptions can not be 
null");
+
+        final UpdateFeaturesRequestData request = 
UpdateFeaturesRequest.create(featureUpdates);
+        final Map<String, KafkaFutureImpl<Void>> updateFutures = new 
HashMap<>();
+        for (Map.Entry<String, FeatureUpdate> entry : 
featureUpdates.entrySet()) {
+            updateFutures.put(entry.getKey(), new KafkaFutureImpl<>());
+        }
+        final long now = time.milliseconds();
+        final Call call = new Call("updateFeatures", calcDeadlineMs(now, 
options.timeoutMs()),
+            new ControllerNodeProvider()) {
+
+            @Override
+            UpdateFeaturesRequest.Builder createRequest(int timeoutMs) {
+                return new UpdateFeaturesRequest.Builder(request);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                final UpdateFeaturesResponse response =
+                    (UpdateFeaturesResponse) abstractResponse;
+
+                // Check for controller change.
+                for (UpdatableFeatureResult result : 
response.data().results()) {
+                    final Errors error = Errors.forCode(result.errorCode());
+                    if (error == Errors.NOT_CONTROLLER) {
+                        handleNotControllerError(error);
+                        throw error.exception();

Review comment:
       handleNotControllerError() already throws an exception.
   
   Should other errors like CLUSTER_AUTHORIZATION_FAILED be treated in the same 
way?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/UpdateFeaturesResult.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Map;
+import org.apache.kafka.common.KafkaFuture;
+
+/**
+ * The result of the {@link Admin#updateFeatures(Map, UpdateFeaturesOptions)} 
call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+public class UpdateFeaturesResult {
+    private final Map<String, KafkaFuture<Void>> futures;
+
+    /**
+     * @param futures   a map from feature names to future, which can be used 
to check the status of
+     *                  individual feature updates.
+     */
+    public UpdateFeaturesResult(final Map<String, KafkaFuture<Void>> futures) {
+        this.futures = futures;
+    }
+
+    public Map<String, KafkaFuture<Void>> values() {

Review comment:
       The KIP doesn't have this method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to