kowshik commented on a change in pull request #8680:
URL: https://github.com/apache/kafka/pull/8680#discussion_r427022316



##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value 
being VersionRangeType.

Review comment:
       Done.

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value 
being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version 
ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the 
important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ */
+public class Features<VersionRangeType extends VersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the 
caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to type of VersionRange, as the 
backing data structure
+     *                   for the Features object.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        this.features = features;

Review comment:
       Do you feel strongly about this?
   The reasons why I ask the question is:
   1. Caller is unlikely to pass `null`.
   2.  I looked over a number of other existing classes in Kafka, and there 
aren't any null checks for most constructor parameters.
   
   It will help me if you could share couple examples from existing code where 
the `null` check convention is followed in Kafka.

##########
File path: 
clients/src/test/java/org/apache/kafka/common/feature/FeaturesTest.java
##########
@@ -0,0 +1,216 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+
+public class FeaturesTest {
+
+    @Test
+    public void testEmptyFeatures() {
+        Map<String, Map<String, Long>> emptyMap = new HashMap<>();
+
+        Features<VersionLevelRange> emptyFinalizedFeatures = 
Features.emptyFinalizedFeatures();
+        assertEquals(new HashMap<>(), emptyFinalizedFeatures.all());
+        assertEquals(emptyMap, emptyFinalizedFeatures.serialize());
+        assertEquals(emptyFinalizedFeatures, 
Features.deserializeFinalizedFeatures(emptyMap));
+
+        Features<VersionRange> emptySupportedFeatures = 
Features.emptySupportedFeatures();
+        assertEquals(new HashMap<>(), emptySupportedFeatures.all());
+        assertEquals(
+            new HashMap<String, HashMap<String, Long>>(),
+            emptySupportedFeatures.serialize());
+        assertEquals(emptySupportedFeatures, 
Features.deserializeSupportedFeatures(emptyMap));
+    }
+
+    @Test
+    public void testAllAPI() {
+        VersionRange v1 = new VersionRange(1, 2);
+        VersionRange v2 = new VersionRange(3, 4);
+        Map<String, VersionRange> allFeatures = new HashMap<String, 
VersionRange>() {
+            {
+                put("feature_1", v1);
+                put("feature_2", v2);
+            }
+        };
+        Features<VersionRange> features = 
Features.supportedFeatures(allFeatures);
+        assertEquals(allFeatures, features.all());
+    }
+
+    @Test
+    public void testGetAPI() {
+        VersionRange v1 = new VersionRange(1, 2);
+        VersionRange v2 = new VersionRange(3, 4);
+        Map<String, VersionRange> allFeatures = new HashMap<String, 
VersionRange>() {
+            {
+                put("feature_1", v1);
+                put("feature_2", v2);
+            }
+        };
+        Features<VersionRange> features = 
Features.supportedFeatures(allFeatures);
+        assertEquals(v1, features.get("feature_1"));
+        assertEquals(v2, features.get("feature_2"));
+        assertNull(features.get("nonexistent_feature"));
+    }
+
+    @Test
+    public void testSerializeDeserializeSupportedFeatures() {
+        VersionRange v1 = new VersionRange(1, 2);
+        VersionRange v2 = new VersionRange(3, 4);
+        Map<String, VersionRange> allFeatures = new HashMap<String, 
VersionRange>() {

Review comment:
       Done.

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value 
being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version 
ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the 
important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ */
+public class Features<VersionRangeType extends VersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the 
caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to type of VersionRange, as the 
backing data structure
+     *                   for the Features object.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        this.features = features;
+    }
+
+    /**
+     * @param features   Map of feature name to VersionRange, as the backing 
data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"supported" features.
+     */
+    public static Features<VersionRange> supportedFeatures(Map<String, 
VersionRange> features) {
+        return new Features<VersionRange>(features);
+    }
+
+    /**
+     * @param features   Map of feature name to VersionLevelRange, as the 
backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"finalized" features.
+     */
+    public static Features<VersionLevelRange> finalizedFeatures(Map<String, 
VersionLevelRange> features) {
+        return new Features<VersionLevelRange>(features);
+    }
+
+    public static Features<VersionLevelRange> emptyFinalizedFeatures() {

Review comment:
       Done. Yes, I've changed it to default visibility now.

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value 
being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version 
ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the 
important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ */
+public class Features<VersionRangeType extends VersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the 
caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to type of VersionRange, as the 
backing data structure
+     *                   for the Features object.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        this.features = features;
+    }
+
+    /**
+     * @param features   Map of feature name to VersionRange, as the backing 
data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"supported" features.
+     */
+    public static Features<VersionRange> supportedFeatures(Map<String, 
VersionRange> features) {
+        return new Features<VersionRange>(features);
+    }
+
+    /**
+     * @param features   Map of feature name to VersionLevelRange, as the 
backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"finalized" features.
+     */
+    public static Features<VersionLevelRange> finalizedFeatures(Map<String, 
VersionLevelRange> features) {
+        return new Features<VersionLevelRange>(features);
+    }
+
+    public static Features<VersionLevelRange> emptyFinalizedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+    public static Features<VersionRange> emptySupportedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+
+    public Map<String, VersionRangeType> all() {
+        return features;
+    }
+
+    public boolean empty() {
+        return features.isEmpty();
+    }
+
+    public VersionRangeType get(String feature) {
+        return all().get(feature);
+    }
+
+    public String toString() {
+        return String.format(
+            "Features{%s}",
+            features
+                .entrySet()
+                .stream()
+                .map(entry -> String.format("(%s -> %s)", entry.getKey(), 
entry.getValue()))
+                .collect(joining(", "))
+        );
+    }
+
+    /**
+     * @return   Serializes the underlying features to a map, and returns the 
same.
+     *           The returned value can be deserialized using one of the 
deserialize* APIs.
+     */
+    public Map<String, Map<String, Long>> serialize() {
+        return features.entrySet().stream().collect(
+            Collectors.toMap(
+                Map.Entry::getKey,
+                entry -> entry.getValue().serialize()));
+    }
+
+    /**
+     * Deserializes a map to Features<VersionLevelRange>.
+     *
+     * @param serialized   the serialized representation of a 
Features<VersionLevelRange> object,
+     *                     generated using the serialize() API.
+     *
+     * @return             the deserialized Features<VersionLevelRange> object
+     */
+    public static Features<VersionLevelRange> deserializeFinalizedFeatures(
+        Map<String, Map<String, Long>> serialized) {
+        return finalizedFeatures(serialized.entrySet().stream().collect(
+            Collectors.toMap(
+                Map.Entry::getKey,
+                entry -> VersionLevelRange.deserialize(entry.getValue()))));
+    }
+
+    /**
+     * Deserializes a map to Features<VersionRange>.
+     *
+     * @param serialized   the serialized representation of a 
Features<VersionRange> object,
+     *                     generated using the serialize() API.
+     *
+     * @return             the deserialized Features<VersionRange> object
+     */
+    public static Features<VersionRange> deserializeSupportedFeatures(
+        Map<String, Map<String, Long>> serialized) {
+        return supportedFeatures(serialized.entrySet().stream().collect(
+            Collectors.toMap(
+                Map.Entry::getKey,
+                entry -> VersionRange.deserialize(entry.getValue()))));
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {

Review comment:
       Done. Good point! Added test as well.

##########
File path: 
clients/src/test/java/org/apache/kafka/common/feature/FeaturesTest.java
##########
@@ -0,0 +1,216 @@
+package org.apache.kafka.common.feature;

Review comment:
       Done.

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##########
@@ -0,0 +1,93 @@
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, VersionLevelRange}
+
+// Raised whenever there was an error in updating the FinalizedFeatureCache 
with features.
+class FeatureCacheUpdateException(message: String) extends 
RuntimeException(message) {
+}
+
+// Helper class that represents finalized features along with an epoch value.
+case class FinalizedFeaturesAndEpoch(features: Features[VersionLevelRange], 
epoch: Int) {
+
+  def isValid(newEpoch: Int): Boolean = {
+    newEpoch >= epoch
+  }
+
+  override def toString(): String = {
+    "FinalizedFeaturesAndEpoch(features=%s, epoch=%s)".format(features, epoch)
+  }
+}
+
+/**
+ * A mutable cache containing the latest finalized features and epoch. This 
cache is populated by a
+ * FinalizedFeatureChangeListener.
+ *
+ * Currently the main reader of this cache is the read path that serves an 
ApiVersionsRequest
+ * returning the features information in the response. In the future, as the 
feature versioning
+ * system in KIP-584 is used more widely, this cache could be read by other 
read paths trying to
+ * learn the finalized feature information.
+ */
+object FinalizedFeatureCache extends Logging {
+  @volatile private var featuresAndEpoch: Option[FinalizedFeaturesAndEpoch] = 
Option.empty
+
+  /**
+   * @return   the latest known FinalizedFeaturesAndEpoch. If the returned 
value is empty, it means
+   *           no FinalizedFeaturesAndEpoch exists in the cache at the time 
when this
+   *           method is invoked. This result could change in the future 
whenever the
+   *           updateOrThrow method is invoked.
+   */
+  def get: Option[FinalizedFeaturesAndEpoch] = {
+    featuresAndEpoch
+  }
+
+  def empty: Boolean = {
+    featuresAndEpoch.isEmpty
+  }
+
+  /**
+   * Clears all existing finalized features and epoch from the cache.
+   */
+  def clear(): Unit = {
+    featuresAndEpoch = Option.empty
+    info("Cleared cache")
+  }
+
+  /**
+   * Updates the cache to the latestFeatures, and updates the existing epoch 
to latestEpoch.
+   * Raises an exception when the operation is not successful.
+   *
+   * @param latestFeatures   the latest finalized features to be set in the 
cache
+   * @param latestEpoch      the latest epoch value to be set in the cache
+   *
+   * @throws                 FeatureCacheUpdateException if the cache update 
operation fails
+   *                         due to invalid parameters or incompatibilities 
with the broker's
+   *                         supported features. In such a case, the existing 
cache contents are
+   *                         not modified.
+   */
+  def updateOrThrow(latestFeatures: Features[VersionLevelRange], latestEpoch: 
Int): Unit = {
+    updateOrThrow(FinalizedFeaturesAndEpoch(latestFeatures, latestEpoch))
+  }
+
+  private def updateOrThrow(latest: FinalizedFeaturesAndEpoch): Unit = {
+    val existingStr = featuresAndEpoch.map(existing => 
existing.toString).getOrElse("<empty>")
+    if (!featuresAndEpoch.isEmpty && featuresAndEpoch.get.epoch > 
latest.epoch) {

Review comment:
       Done.

##########
File path: core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala
##########
@@ -0,0 +1,95 @@
+package kafka.server
+
+import org.apache.kafka.common.feature.{Features, VersionLevelRange, 
VersionRange}
+import org.junit.Assert.{assertEquals, assertThrows, assertTrue}
+import org.junit.{Before, Test}
+
+import scala.jdk.CollectionConverters._
+
+class FinalizedFeatureCacheTest {
+
+  @Before
+  def setUp(): Unit = {
+    FinalizedFeatureCache.clear()
+    SupportedFeatures.clear()
+  }
+
+  @Test
+  def testEmpty(): Unit = {
+    assertTrue(FinalizedFeatureCache.get.isEmpty)
+  }
+
+  @Test
+  def testUpdateOrThrowFailedDueToInvalidEpoch(): Unit = {
+    val supportedFeatures = Map[String, VersionRange](
+      "feature_1" -> new VersionRange(1, 4))
+    
SupportedFeatures.update(Features.supportedFeatures(supportedFeatures.asJava))
+
+    val features = Map[String, VersionLevelRange](
+      "feature_1" -> new VersionLevelRange(1, 4))
+    val finalizedFeatures = Features.finalizedFeatures(features.asJava)
+
+    FinalizedFeatureCache.updateOrThrow(finalizedFeatures, 10)
+    assertEquals(finalizedFeatures, FinalizedFeatureCache.get.get.features)

Review comment:
       Done.

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##########
@@ -0,0 +1,93 @@
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, VersionLevelRange}
+
+// Raised whenever there was an error in updating the FinalizedFeatureCache 
with features.
+class FeatureCacheUpdateException(message: String) extends 
RuntimeException(message) {
+}
+
+// Helper class that represents finalized features along with an epoch value.
+case class FinalizedFeaturesAndEpoch(features: Features[VersionLevelRange], 
epoch: Int) {
+
+  def isValid(newEpoch: Int): Boolean = {
+    newEpoch >= epoch
+  }
+
+  override def toString(): String = {
+    "FinalizedFeaturesAndEpoch(features=%s, epoch=%s)".format(features, epoch)
+  }
+}
+
+/**
+ * A mutable cache containing the latest finalized features and epoch. This 
cache is populated by a
+ * FinalizedFeatureChangeListener.
+ *
+ * Currently the main reader of this cache is the read path that serves an 
ApiVersionsRequest
+ * returning the features information in the response. In the future, as the 
feature versioning
+ * system in KIP-584 is used more widely, this cache could be read by other 
read paths trying to
+ * learn the finalized feature information.
+ */
+object FinalizedFeatureCache extends Logging {
+  @volatile private var featuresAndEpoch: Option[FinalizedFeaturesAndEpoch] = 
Option.empty
+
+  /**
+   * @return   the latest known FinalizedFeaturesAndEpoch. If the returned 
value is empty, it means
+   *           no FinalizedFeaturesAndEpoch exists in the cache at the time 
when this
+   *           method is invoked. This result could change in the future 
whenever the
+   *           updateOrThrow method is invoked.
+   */
+  def get: Option[FinalizedFeaturesAndEpoch] = {
+    featuresAndEpoch
+  }
+
+  def empty: Boolean = {
+    featuresAndEpoch.isEmpty
+  }
+
+  /**
+   * Clears all existing finalized features and epoch from the cache.
+   */
+  def clear(): Unit = {
+    featuresAndEpoch = Option.empty
+    info("Cleared cache")
+  }
+
+  /**
+   * Updates the cache to the latestFeatures, and updates the existing epoch 
to latestEpoch.
+   * Raises an exception when the operation is not successful.
+   *
+   * @param latestFeatures   the latest finalized features to be set in the 
cache
+   * @param latestEpoch      the latest epoch value to be set in the cache
+   *
+   * @throws                 FeatureCacheUpdateException if the cache update 
operation fails
+   *                         due to invalid parameters or incompatibilities 
with the broker's
+   *                         supported features. In such a case, the existing 
cache contents are
+   *                         not modified.
+   */
+  def updateOrThrow(latestFeatures: Features[VersionLevelRange], latestEpoch: 
Int): Unit = {
+    updateOrThrow(FinalizedFeaturesAndEpoch(latestFeatures, latestEpoch))
+  }
+
+  private def updateOrThrow(latest: FinalizedFeaturesAndEpoch): Unit = {
+    val existingStr = featuresAndEpoch.map(existing => 
existing.toString).getOrElse("<empty>")
+    if (!featuresAndEpoch.isEmpty && featuresAndEpoch.get.epoch > 
latest.epoch) {
+      val errorMsg = ("FinalizedFeatureCache update failed due to invalid 
epoch in new finalized %s." +
+        " The existing finalized is %s").format(latest, existingStr)
+      throw new FeatureCacheUpdateException(errorMsg)
+    } else {
+      val incompatibleFeatures = 
SupportedFeatures.incompatibleFeatures(latest.features)
+      if (incompatibleFeatures.nonEmpty) {
+        val errorMsg = ("FinalizedFeatureCache updated failed since feature 
compatibility" +

Review comment:
       It is used intentionally to split the log message into 2 lines (for 
~100-char readability limit per line). Otherwise the string will be huge and 
all in the same line.

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value 
being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version 
ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the 
important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ */
+public class Features<VersionRangeType extends VersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the 
caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to type of VersionRange, as the 
backing data structure
+     *                   for the Features object.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        this.features = features;
+    }
+
+    /**
+     * @param features   Map of feature name to VersionRange, as the backing 
data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"supported" features.
+     */
+    public static Features<VersionRange> supportedFeatures(Map<String, 
VersionRange> features) {
+        return new Features<VersionRange>(features);
+    }
+
+    /**
+     * @param features   Map of feature name to VersionLevelRange, as the 
backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"finalized" features.
+     */
+    public static Features<VersionLevelRange> finalizedFeatures(Map<String, 
VersionLevelRange> features) {
+        return new Features<VersionLevelRange>(features);
+    }
+
+    public static Features<VersionLevelRange> emptyFinalizedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+    public static Features<VersionRange> emptySupportedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+
+    public Map<String, VersionRangeType> all() {
+        return features;
+    }
+
+    public boolean empty() {
+        return features.isEmpty();
+    }
+
+    public VersionRangeType get(String feature) {
+        return all().get(feature);

Review comment:
       The underlying data structure is a `Map`. It would be simpler if this 
method just returns `null` if the feature doesn't exist. For example, that is 
how java's `Map.get` works, here is the javadoc: 
https://docs.oracle.com/javase/8/docs/api/java/util/Map.html#get-java.lang.Object-.
   
   Also, I've documented this method now (doc was previously absent).

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value 
being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version 
ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the 
important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ */
+public class Features<VersionRangeType extends VersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the 
caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to type of VersionRange, as the 
backing data structure
+     *                   for the Features object.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        this.features = features;
+    }
+
+    /**
+     * @param features   Map of feature name to VersionRange, as the backing 
data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"supported" features.
+     */
+    public static Features<VersionRange> supportedFeatures(Map<String, 
VersionRange> features) {
+        return new Features<VersionRange>(features);
+    }
+
+    /**
+     * @param features   Map of feature name to VersionLevelRange, as the 
backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"finalized" features.
+     */
+    public static Features<VersionLevelRange> finalizedFeatures(Map<String, 
VersionLevelRange> features) {
+        return new Features<VersionLevelRange>(features);

Review comment:
       Done.

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value 
being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version 
ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the 
important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ */
+public class Features<VersionRangeType extends VersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the 
caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to type of VersionRange, as the 
backing data structure
+     *                   for the Features object.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        this.features = features;
+    }
+
+    /**
+     * @param features   Map of feature name to VersionRange, as the backing 
data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"supported" features.
+     */
+    public static Features<VersionRange> supportedFeatures(Map<String, 
VersionRange> features) {
+        return new Features<VersionRange>(features);

Review comment:
       Done. Good point!

##########
File path: 
clients/src/test/java/org/apache/kafka/common/feature/FeaturesTest.java
##########
@@ -0,0 +1,216 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+
+public class FeaturesTest {
+
+    @Test
+    public void testEmptyFeatures() {
+        Map<String, Map<String, Long>> emptyMap = new HashMap<>();
+
+        Features<VersionLevelRange> emptyFinalizedFeatures = 
Features.emptyFinalizedFeatures();
+        assertEquals(new HashMap<>(), emptyFinalizedFeatures.all());
+        assertEquals(emptyMap, emptyFinalizedFeatures.serialize());
+        assertEquals(emptyFinalizedFeatures, 
Features.deserializeFinalizedFeatures(emptyMap));
+
+        Features<VersionRange> emptySupportedFeatures = 
Features.emptySupportedFeatures();
+        assertEquals(new HashMap<>(), emptySupportedFeatures.all());
+        assertEquals(
+            new HashMap<String, HashMap<String, Long>>(),
+            emptySupportedFeatures.serialize());
+        assertEquals(emptySupportedFeatures, 
Features.deserializeSupportedFeatures(emptyMap));
+    }
+
+    @Test
+    public void testAllAPI() {

Review comment:
       Done.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/feature/VersionRange.java
##########
@@ -0,0 +1,104 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Represents an immutable basic version range using 2 attributes: min and max 
of type long.
+ * The min and max attributes are expected to be >= 1, and with max >= min.
+ *
+ * The class also provides API to serialize/deserialize the version range 
to/from a map.
+ * The class allows for configurable labels for the min/max attributes, which 
can be specialized by
+ * sub-classes (if needed).
+ *
+ * NOTE: This is the backing class used to define the min/max versions for 
supported features.
+ */
+public class VersionRange {
+    // Label for the min version key, that's used only for 
serialization/deserialization purposes.
+    private static final String MIN_VERSION_KEY_LABEL = "min_version";
+
+    // Label for the max version key, that's used only for 
serialization/deserialization purposes.
+    private static final String MAX_VERSION_KEY_LABEL = "max_version";
+
+    private final String minKeyLabel;
+
+    private final long minValue;
+
+    private final String maxKeyLabel;
+
+    private final long maxValue;
+
+    protected VersionRange(String minKey, long minValue, String maxKeyLabel, 
long maxValue) {
+        if (minValue < 1 || maxValue < 1 || maxValue < minValue) {
+            throw new IllegalArgumentException(
+                String.format(
+                    "Expected minValue > 1, maxValue > 1 and maxValue >= 
minValue, but received" +
+                    " minValue: %d, maxValue: %d", minValue, maxValue));
+        }
+        this.minKeyLabel = minKey;
+        this.minValue = minValue;
+        this.maxKeyLabel = maxKeyLabel;
+        this.maxValue = maxValue;
+    }
+
+    public VersionRange(long minVersion, long maxVersion) {
+        this(MIN_VERSION_KEY_LABEL, minVersion, MAX_VERSION_KEY_LABEL, 
maxVersion);
+    }
+
+    public long min() {
+        return minValue;
+    }
+
+    public long max() {
+        return maxValue;
+    }
+
+    public String toString() {
+        return String.format("%s[%d, %d]", this.getClass().getSimpleName(), 
min(), max());
+    }
+
+    public Map<String, Long> serialize() {
+        return new HashMap<String, Long>() {
+            {
+                put(minKeyLabel, min());
+                put(maxKeyLabel, max());
+            }
+        };
+    }
+
+    public static VersionRange deserialize(Map<String, Long> serialized) {
+        return new VersionRange(
+            valueOrThrow(MIN_VERSION_KEY_LABEL, serialized),
+            valueOrThrow(MAX_VERSION_KEY_LABEL, serialized));
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+        if (!(other instanceof VersionRange)) {
+            return false;
+        }
+
+        final VersionRange that = (VersionRange) other;
+        return Objects.equals(this.minKeyLabel, that.minKeyLabel) &&

Review comment:
       It provides slightly better convenience: `Object.equals` will also take 
care of the `null` checks for you.
   Also it turned out it was overkill to use `Objects.equals` for primitive 
type checks for `minValue` and `maxValue`. So I've simplified the code to use 
`==` those attributes.
   
   Good point!

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value 
being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version 
ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the 
important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ */
+public class Features<VersionRangeType extends VersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the 
caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to type of VersionRange, as the 
backing data structure
+     *                   for the Features object.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        this.features = features;
+    }
+
+    /**
+     * @param features   Map of feature name to VersionRange, as the backing 
data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"supported" features.
+     */
+    public static Features<VersionRange> supportedFeatures(Map<String, 
VersionRange> features) {
+        return new Features<VersionRange>(features);
+    }
+
+    /**
+     * @param features   Map of feature name to VersionLevelRange, as the 
backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"finalized" features.
+     */
+    public static Features<VersionLevelRange> finalizedFeatures(Map<String, 
VersionLevelRange> features) {
+        return new Features<VersionLevelRange>(features);
+    }
+
+    public static Features<VersionLevelRange> emptyFinalizedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+    public static Features<VersionRange> emptySupportedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+
+    public Map<String, VersionRangeType> all() {
+        return features;
+    }
+
+    public boolean empty() {
+        return features.isEmpty();
+    }
+
+    public VersionRangeType get(String feature) {
+        return all().get(feature);
+    }
+
+    public String toString() {
+        return String.format(
+            "Features{%s}",
+            features
+                .entrySet()
+                .stream()
+                .map(entry -> String.format("(%s -> %s)", entry.getKey(), 
entry.getValue()))
+                .collect(joining(", "))
+        );
+    }
+
+    /**
+     * @return   Serializes the underlying features to a map, and returns the 
same.
+     *           The returned value can be deserialized using one of the 
deserialize* APIs.
+     */
+    public Map<String, Map<String, Long>> serialize() {
+        return features.entrySet().stream().collect(
+            Collectors.toMap(
+                Map.Entry::getKey,
+                entry -> entry.getValue().serialize()));
+    }
+
+    /**
+     * Deserializes a map to Features<VersionLevelRange>.

Review comment:
       Done.

##########
File path: clients/src/main/resources/common/message/ApiVersionsResponse.json
##########
@@ -42,6 +42,33 @@
         "about": "The maximum supported version, inclusive." }
     ]},
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", 
"ignorable": true,
-      "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." }
+      "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },

Review comment:
       Are you sure? All newly added fields are tagged (i.e. optional).
   Going by [this 
documentation](https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields#KIP-482:TheKafkaProtocolshouldSupportOptionalTaggedFields-TaggedFields)
 in KIP-482, it is not required to change the schema version whenever tagged 
fields are introduced.

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value 
being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version 
ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the 
important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ */
+public class Features<VersionRangeType extends VersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the 
caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to type of VersionRange, as the 
backing data structure
+     *                   for the Features object.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        this.features = features;
+    }
+
+    /**
+     * @param features   Map of feature name to VersionRange, as the backing 
data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"supported" features.
+     */
+    public static Features<VersionRange> supportedFeatures(Map<String, 
VersionRange> features) {
+        return new Features<VersionRange>(features);
+    }
+
+    /**
+     * @param features   Map of feature name to VersionLevelRange, as the 
backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"finalized" features.
+     */
+    public static Features<VersionLevelRange> finalizedFeatures(Map<String, 
VersionLevelRange> features) {
+        return new Features<VersionLevelRange>(features);
+    }
+
+    public static Features<VersionLevelRange> emptyFinalizedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+    public static Features<VersionRange> emptySupportedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+
+    public Map<String, VersionRangeType> all() {
+        return features;
+    }
+
+    public boolean empty() {
+        return features.isEmpty();
+    }
+
+    public VersionRangeType get(String feature) {
+        return all().get(feature);
+    }
+
+    public String toString() {
+        return String.format(
+            "Features{%s}",
+            features
+                .entrySet()
+                .stream()
+                .map(entry -> String.format("(%s -> %s)", entry.getKey(), 
entry.getValue()))
+                .collect(joining(", "))
+        );
+    }
+
+    /**
+     * @return   Serializes the underlying features to a map, and returns the 
same.

Review comment:
       Done.

##########
File path: core/src/main/scala/kafka/cluster/Broker.scala
##########
@@ -34,14 +36,19 @@ object Broker {
                                          brokerId: Int,
                                          endpoints: util.List[Endpoint],
                                          interBrokerEndpoint: Endpoint) 
extends AuthorizerServerInfo
+
+  def apply(id: Int, endPoints: Seq[EndPoint], rack: Option[String]): Broker = 
{
+    new Broker(id, endPoints, rack, emptySupportedFeatures)
+  }
 }
 
 /**
  * A Kafka broker.
- * A broker has an id, a collection of end-points, an optional rack and a 
listener to security protocol map.
+ * A broker has an id, a collection of end-points, an optional rack and a 
listener to security protocol map,

Review comment:
       Done. Good point!

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value 
being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version 
ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the 
important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ */
+public class Features<VersionRangeType extends VersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the 
caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to type of VersionRange, as the 
backing data structure
+     *                   for the Features object.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        this.features = features;
+    }
+
+    /**
+     * @param features   Map of feature name to VersionRange, as the backing 
data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"supported" features.
+     */
+    public static Features<VersionRange> supportedFeatures(Map<String, 
VersionRange> features) {
+        return new Features<VersionRange>(features);
+    }
+
+    /**
+     * @param features   Map of feature name to VersionLevelRange, as the 
backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"finalized" features.
+     */
+    public static Features<VersionLevelRange> finalizedFeatures(Map<String, 
VersionLevelRange> features) {
+        return new Features<VersionLevelRange>(features);
+    }
+
+    public static Features<VersionLevelRange> emptyFinalizedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+    public static Features<VersionRange> emptySupportedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+
+    public Map<String, VersionRangeType> all() {

Review comment:
       Done. Good point!

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
##########
@@ -113,14 +141,26 @@ public static ApiVersionsResponse fromStruct(Struct 
struct, short version) {
         }
     }
 
-    public static ApiVersionsResponse apiVersionsResponse(int throttleTimeMs, 
byte maxMagic) {
+    public static ApiVersionsResponse apiVersionsResponse(

Review comment:
       Done. Good point!

##########
File path: 
clients/src/main/java/org/apache/kafka/common/feature/VersionRange.java
##########
@@ -0,0 +1,104 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Represents an immutable basic version range using 2 attributes: min and max 
of type long.
+ * The min and max attributes are expected to be >= 1, and with max >= min.
+ *
+ * The class also provides API to serialize/deserialize the version range 
to/from a map.
+ * The class allows for configurable labels for the min/max attributes, which 
can be specialized by
+ * sub-classes (if needed).
+ *
+ * NOTE: This is the backing class used to define the min/max versions for 
supported features.
+ */
+public class VersionRange {
+    // Label for the min version key, that's used only for 
serialization/deserialization purposes.
+    private static final String MIN_VERSION_KEY_LABEL = "min_version";
+
+    // Label for the max version key, that's used only for 
serialization/deserialization purposes.
+    private static final String MAX_VERSION_KEY_LABEL = "max_version";
+
+    private final String minKeyLabel;
+
+    private final long minValue;
+
+    private final String maxKeyLabel;
+
+    private final long maxValue;
+
+    protected VersionRange(String minKey, long minValue, String maxKeyLabel, 
long maxValue) {
+        if (minValue < 1 || maxValue < 1 || maxValue < minValue) {
+            throw new IllegalArgumentException(
+                String.format(
+                    "Expected minValue > 1, maxValue > 1 and maxValue >= 
minValue, but received" +
+                    " minValue: %d, maxValue: %d", minValue, maxValue));
+        }
+        this.minKeyLabel = minKey;
+        this.minValue = minValue;
+        this.maxKeyLabel = maxKeyLabel;
+        this.maxValue = maxValue;
+    }
+
+    public VersionRange(long minVersion, long maxVersion) {
+        this(MIN_VERSION_KEY_LABEL, minVersion, MAX_VERSION_KEY_LABEL, 
maxVersion);
+    }
+
+    public long min() {
+        return minValue;
+    }
+
+    public long max() {
+        return maxValue;
+    }
+
+    public String toString() {
+        return String.format("%s[%d, %d]", this.getClass().getSimpleName(), 
min(), max());
+    }
+
+    public Map<String, Long> serialize() {
+        return new HashMap<String, Long>() {
+            {
+                put(minKeyLabel, min());
+                put(maxKeyLabel, max());
+            }
+        };
+    }
+
+    public static VersionRange deserialize(Map<String, Long> serialized) {
+        return new VersionRange(
+            valueOrThrow(MIN_VERSION_KEY_LABEL, serialized),
+            valueOrThrow(MAX_VERSION_KEY_LABEL, serialized));
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+        if (!(other instanceof VersionRange)) {

Review comment:
       Done. Also added a test. Good catch!

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value 
being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version 
ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the 
important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ */
+public class Features<VersionRangeType extends VersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the 
caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to type of VersionRange, as the 
backing data structure
+     *                   for the Features object.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        this.features = features;
+    }
+
+    /**
+     * @param features   Map of feature name to VersionRange, as the backing 
data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"supported" features.
+     */
+    public static Features<VersionRange> supportedFeatures(Map<String, 
VersionRange> features) {
+        return new Features<VersionRange>(features);
+    }
+
+    /**
+     * @param features   Map of feature name to VersionLevelRange, as the 
backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"finalized" features.
+     */
+    public static Features<VersionLevelRange> finalizedFeatures(Map<String, 
VersionLevelRange> features) {
+        return new Features<VersionLevelRange>(features);
+    }
+
+    public static Features<VersionLevelRange> emptyFinalizedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+    public static Features<VersionRange> emptySupportedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+
+    public Map<String, VersionRangeType> all() {
+        return features;
+    }
+
+    public boolean empty() {
+        return features.isEmpty();
+    }
+
+    public VersionRangeType get(String feature) {
+        return all().get(feature);

Review comment:
       Done. Good point!

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
##########
@@ -77,6 +93,18 @@ public boolean shouldClientThrottle(short version) {
         return version >= 2;
     }
 
+    public SupportedFeatureKey supportedFeature(String featureName) {

Review comment:
       Done.

##########
File path: 
clients/src/test/java/org/apache/kafka/common/feature/VersionLevelRangeTest.java
##########
@@ -0,0 +1,162 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class VersionLevelRangeTest {
+
+    @Test
+    public void testCreateFailDueToInvalidParams() {
+        // min and max can't be < 1.

Review comment:
       Done. Some of it is not required. Good point, I have removed the 
unnecessary testing now.
   We still need to check if exception is thrown in these 4 basic tests: min < 
1, max < 1, min & max < 1 and max > min.

##########
File path: 
clients/src/test/java/org/apache/kafka/common/feature/FeaturesTest.java
##########
@@ -0,0 +1,216 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+
+public class FeaturesTest {
+
+    @Test
+    public void testEmptyFeatures() {
+        Map<String, Map<String, Long>> emptyMap = new HashMap<>();
+
+        Features<VersionLevelRange> emptyFinalizedFeatures = 
Features.emptyFinalizedFeatures();
+        assertEquals(new HashMap<>(), emptyFinalizedFeatures.all());
+        assertEquals(emptyMap, emptyFinalizedFeatures.serialize());
+        assertEquals(emptyFinalizedFeatures, 
Features.deserializeFinalizedFeatures(emptyMap));
+
+        Features<VersionRange> emptySupportedFeatures = 
Features.emptySupportedFeatures();
+        assertEquals(new HashMap<>(), emptySupportedFeatures.all());
+        assertEquals(
+            new HashMap<String, HashMap<String, Long>>(),
+            emptySupportedFeatures.serialize());
+        assertEquals(emptySupportedFeatures, 
Features.deserializeSupportedFeatures(emptyMap));
+    }
+
+    @Test
+    public void testAllAPI() {
+        VersionRange v1 = new VersionRange(1, 2);
+        VersionRange v2 = new VersionRange(3, 4);
+        Map<String, VersionRange> allFeatures = new HashMap<String, 
VersionRange>() {
+            {
+                put("feature_1", v1);
+                put("feature_2", v2);
+            }
+        };
+        Features<VersionRange> features = 
Features.supportedFeatures(allFeatures);
+        assertEquals(allFeatures, features.all());
+    }
+
+    @Test
+    public void testGetAPI() {
+        VersionRange v1 = new VersionRange(1, 2);
+        VersionRange v2 = new VersionRange(3, 4);
+        Map<String, VersionRange> allFeatures = new HashMap<String, 
VersionRange>() {

Review comment:
       Done.

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value 
being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version 
ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the 
important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ */
+public class Features<VersionRangeType extends VersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the 
caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to type of VersionRange, as the 
backing data structure
+     *                   for the Features object.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        this.features = features;
+    }
+
+    /**
+     * @param features   Map of feature name to VersionRange, as the backing 
data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"supported" features.
+     */
+    public static Features<VersionRange> supportedFeatures(Map<String, 
VersionRange> features) {
+        return new Features<VersionRange>(features);
+    }
+
+    /**
+     * @param features   Map of feature name to VersionLevelRange, as the 
backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"finalized" features.
+     */
+    public static Features<VersionLevelRange> finalizedFeatures(Map<String, 
VersionLevelRange> features) {
+        return new Features<VersionLevelRange>(features);
+    }
+
+    public static Features<VersionLevelRange> emptyFinalizedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+    public static Features<VersionRange> emptySupportedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+

Review comment:
       Done. Removed it.

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##########
@@ -0,0 +1,93 @@
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, VersionLevelRange}
+
+// Raised whenever there was an error in updating the FinalizedFeatureCache 
with features.
+class FeatureCacheUpdateException(message: String) extends 
RuntimeException(message) {
+}
+
+// Helper class that represents finalized features along with an epoch value.
+case class FinalizedFeaturesAndEpoch(features: Features[VersionLevelRange], 
epoch: Int) {
+
+  def isValid(newEpoch: Int): Boolean = {

Review comment:
       Done. It was unused and I have eliminated it now.

##########
File path: core/src/main/scala/kafka/cluster/Broker.scala
##########
@@ -34,14 +36,19 @@ object Broker {
                                          brokerId: Int,
                                          endpoints: util.List[Endpoint],
                                          interBrokerEndpoint: Endpoint) 
extends AuthorizerServerInfo
+
+  def apply(id: Int, endPoints: Seq[EndPoint], rack: Option[String]): Broker = 
{

Review comment:
       No, this constructor overload was simply created to avoid a churn of 
test code at number of places adding the additional `SupportedFeatures` 
parameter. How do you feel about keeping it?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
##########
@@ -113,14 +141,26 @@ public static ApiVersionsResponse fromStruct(Struct 
struct, short version) {
         }
     }
 
-    public static ApiVersionsResponse apiVersionsResponse(int throttleTimeMs, 
byte maxMagic) {
+    public static ApiVersionsResponse apiVersionsResponse(
+        int throttleTimeMs,
+        byte maxMagic,
+        Features<VersionRange> latestSupportedFeatures,
+        Optional<Features<VersionLevelRange>> finalizedFeatures,
+        Optional<Long> finalizedFeaturesEpoch) {
         if (maxMagic == RecordBatch.CURRENT_MAGIC_VALUE && throttleTimeMs == 
DEFAULT_THROTTLE_TIME) {
             return DEFAULT_API_VERSIONS_RESPONSE;
         }
-        return createApiVersionsResponse(throttleTimeMs, maxMagic);
+        return createApiVersionsResponse(
+            throttleTimeMs, maxMagic, latestSupportedFeatures, 
finalizedFeatures, finalizedFeaturesEpoch);
     }
 
-    public static ApiVersionsResponse createApiVersionsResponse(int 
throttleTimeMs, final byte minMagic) {
+    public static ApiVersionsResponse createApiVersionsResponse(

Review comment:
       The tests have been already added. Pls check out the tests added in 
`ApiVersionsResponseTest.java`, particularly: 
`shouldReturnFeatureKeysWhenMagicIsCurrentValueAndThrottleMsIsDefaultThrottle`.
   
   Let me know if this test does not look sufficient.

##########
File path: 
clients/src/test/java/org/apache/kafka/common/feature/VersionRangeTest.java
##########
@@ -0,0 +1,150 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+public class VersionRangeTest {
+    @Test
+    public void testFailDueToInvalidParams() {
+        // min and max can't be < 1.
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new VersionRange(0, 0));
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new VersionRange(-1, -1));
+        // min can't be < 1.
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new VersionRange(0, 1));
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new VersionRange(-1, 1));
+        // max can't be < 1.
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new VersionRange(1, 0));
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new VersionRange(1, -1));
+        // min can't be > max.
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new VersionRange(2, 1));
+    }
+
+    @Test
+    public void testSerializeDeserializeTest() {
+        VersionRange versionRange = new VersionRange(1, 2);
+        assertEquals(1, versionRange.min());
+        assertEquals(2, versionRange.max());
+
+        Map<String, Long> serialized = versionRange.serialize();
+        assertEquals(
+            new HashMap<String, Long>() {
+                {
+                    put("min_version", versionRange.min());
+                    put("max_version", versionRange.max());
+                }
+            },
+            serialized
+        );
+
+        VersionRange deserialized = VersionRange.deserialize(serialized);
+        assertEquals(1, deserialized.min());
+        assertEquals(2, deserialized.max());
+        assertEquals(versionRange, deserialized);
+    }
+
+    @Test
+    public void testDeserializationFailureTest() {
+        // min_version can't be < 1.
+        Map<String, Long> invalidWithBadMinVersion = new HashMap<String, 
Long>() {
+            {
+                put("min_version", 0L);
+                put("max_version", 1L);
+            }
+        };
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> VersionRange.deserialize(invalidWithBadMinVersion));
+
+        // max_version can't be < 1.
+        Map<String, Long> invalidWithBadMaxVersion = new HashMap<String, 
Long>() {
+            {
+                put("min_version", 1L);
+                put("max_version", 0L);
+            }
+        };
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> VersionRange.deserialize(invalidWithBadMaxVersion));
+
+        // min_version and max_version can't be < 1.
+        Map<String, Long> invalidWithBadMinMaxVersion = new HashMap<String, 
Long>() {
+            {
+                put("min_version", 0L);
+                put("max_version", 0L);
+            }
+        };
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> VersionRange.deserialize(invalidWithBadMinMaxVersion));
+
+        // min_version can't be > max_version.
+        Map<String, Long> invalidWithLowerMaxVersion = new HashMap<String, 
Long>() {
+            {
+                put("min_version", 2L);
+                put("max_version", 1L);
+            }
+        };
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> VersionRange.deserialize(invalidWithLowerMaxVersion));
+
+        // min_version key missing.
+        Map<String, Long> invalidWithMinKeyMissing = new HashMap<String, 
Long>() {
+            {
+                put("max_version", 1L);
+            }
+        };
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> VersionRange.deserialize(invalidWithMinKeyMissing));
+
+        // max_version key missing.
+        Map<String, Long> invalidWithMaxKeyMissing = new HashMap<String, 
Long>() {
+            {
+                put("min_version", 1L);
+            }
+        };
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> VersionRange.deserialize(invalidWithMaxKeyMissing));
+    }
+
+    @Test
+    public void testToString() {
+        assertEquals("VersionRange[1, 1]", new VersionRange(1, 1).toString());
+        assertEquals("VersionRange[1, 2]", new VersionRange(1, 2).toString());
+    }
+
+    @Test
+    public void testEquals() {
+        assertTrue(new VersionRange(1, 1).equals(new VersionRange(1, 1)));
+        assertFalse(new VersionRange(1, 1).equals(new VersionRange(1, 2)));
+    }
+
+    @Test
+    public void testGetters() {

Review comment:
       Done.

##########
File path: core/src/main/scala/kafka/server/SupportedFeatures.scala
##########
@@ -0,0 +1,70 @@
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, VersionRange, 
VersionLevelRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A common object used in the Broker to define the latest features supported 
by the Broker.
+ * Also provides API to check for incompatibilities between the latest 
features supported by the
+ * Broker and cluster-wide finalized features.
+ */
+object SupportedFeatures extends Logging {
+  /**
+   * This is the latest features supported by the Broker.
+   * This is currently empty, but in the future as we define supported 
features, this map should be
+   * populated.
+   */
+  @volatile private var supportedFeatures = emptySupportedFeatures
+
+  /**
+   * Returns the latest features supported by the Broker.
+   */
+  def get: Features[VersionRange] = {
+    supportedFeatures
+  }
+
+  // Should be used only for testing.
+  def update(newFeatures: Features[VersionRange]): Unit = {
+    supportedFeatures = newFeatures
+  }
+
+  // Should be used only for testing.
+  def clear(): Unit = {
+    supportedFeatures = emptySupportedFeatures
+  }
+
+  /**
+   * Returns the set of feature names found to be incompatible between the 
latest features supported
+   * by the Broker, and the provided cluster-wide finalized features.
+   *
+   * @param finalized   The finalized features against which incompatibilities 
need to be checked for.
+   *
+   * @return            The set of incompatible feature names. If the returned 
set is empty, it
+   *                    means there were no feature incompatibilities found.
+   */
+  def incompatibleFeatures(finalized: Features[VersionLevelRange]): 
Set[String] = {
+    val supported = get
+
+    val incompatibilities = finalized.all.asScala.collect {
+      case (feature, versionLevels) => {
+        val supportedVersions = supported.get(feature);
+        if (supportedVersions == null) {
+          (feature, "{feature=%s, reason='Unsupported 
feature'}".format(feature))
+        } else if (!versionLevels.isCompatibleWith(supportedVersions)) {

Review comment:
       Done.

##########
File path: 
clients/src/test/java/org/apache/kafka/common/feature/VersionLevelRangeTest.java
##########
@@ -0,0 +1,162 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class VersionLevelRangeTest {
+
+    @Test
+    public void testCreateFailDueToInvalidParams() {
+        // min and max can't be < 1.
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new VersionLevelRange(0, 0));
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new VersionLevelRange(-1, -1));
+        // min can't be < 1.
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new VersionLevelRange(0, 1));
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new VersionLevelRange(-1, 1));
+        // max can't be < 1.
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new VersionLevelRange(1, 0));
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new VersionLevelRange(1, -1));
+        // min can't be > max.
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new VersionLevelRange(2, 1));
+    }
+
+    @Test
+    public void testSerializeDeserialize() {
+        VersionLevelRange versionLevelRange = new VersionLevelRange(1, 2);
+        assertEquals(1, versionLevelRange.min());
+        assertEquals(2, versionLevelRange.max());
+
+        Map<String, Long> serialized = versionLevelRange.serialize();
+        assertEquals(
+            new HashMap<String, Long>() {
+                {
+                    put("min_version_level", versionLevelRange.min());
+                    put("max_version_level", versionLevelRange.max());
+                }
+            },
+            serialized
+        );
+
+        VersionLevelRange deserialized = 
VersionLevelRange.deserialize(serialized);
+        assertEquals(1, deserialized.min());
+        assertEquals(2, deserialized.max());
+        assertEquals(versionLevelRange, deserialized);
+    }
+
+    @Test
+    public void testDeserializationFailureTest() {
+        // min_version_level can't be < 1.
+        Map<String, Long> invalidWithBadMinVersion = new HashMap<String, 
Long>() {
+            {
+                put("min_version_level", 0L);
+                put("max_version_level", 1L);
+            }
+        };
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> VersionLevelRange.deserialize(invalidWithBadMinVersion));
+
+        // max_version_level can't be < 1.
+        Map<String, Long> invalidWithBadMaxVersion = new HashMap<String, 
Long>() {
+            {
+                put("min_version_level", 1L);
+                put("max_version_level", 0L);
+            }
+        };
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> VersionLevelRange.deserialize(invalidWithBadMaxVersion));
+
+        // min_version_level and max_version_level can't be < 1.
+        Map<String, Long> invalidWithBadMinMaxVersion = new HashMap<String, 
Long>() {
+            {
+                put("min_version_level", 0L);
+                put("max_version_level", 0L);
+            }
+        };
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> VersionLevelRange.deserialize(invalidWithBadMinMaxVersion));
+
+        // min_version_level can't be > max_version_level.
+        Map<String, Long> invalidWithLowerMaxVersion = new HashMap<String, 
Long>() {
+            {
+                put("min_version_level", 2L);
+                put("max_version_level", 1L);
+            }
+        };
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> VersionLevelRange.deserialize(invalidWithLowerMaxVersion));
+
+        // min_version_level key missing.
+        Map<String, Long> invalidWithMinKeyMissing = new HashMap<String, 
Long>() {
+            {
+                put("max_version_level", 1L);
+            }
+        };
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> VersionLevelRange.deserialize(invalidWithMinKeyMissing));
+
+        // max_version_level key missing.
+        Map<String, Long> invalidWithMaxKeyMissing = new HashMap<String, 
Long>() {
+            {
+                put("min_version_level", 1L);
+            }
+        };
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> VersionLevelRange.deserialize(invalidWithMaxKeyMissing));
+    }
+
+    @Test
+    public void testToString() {
+        assertEquals("VersionLevelRange[1, 1]", new VersionLevelRange(1, 
1).toString());
+        assertEquals("VersionLevelRange[1, 2]", new VersionLevelRange(1, 
2).toString());
+    }
+
+    @Test
+    public void testEquals() {
+        assertTrue(new VersionLevelRange(1, 1).equals(new VersionLevelRange(1, 
1)));
+        assertFalse(new VersionLevelRange(1, 1).equals(new 
VersionLevelRange(1, 2)));
+    }
+
+    @Test
+    public void testIsCompatibleWith() {
+        assertTrue(new VersionLevelRange(1, 1).isCompatibleWith(new 
VersionRange(1, 1)));
+        assertTrue(new VersionLevelRange(2, 3).isCompatibleWith(new 
VersionRange(1, 4)));
+        assertTrue(new VersionLevelRange(1, 4).isCompatibleWith(new 
VersionRange(1, 4)));
+
+        assertFalse(new VersionLevelRange(1, 4).isCompatibleWith(new 
VersionRange(2, 3)));
+        assertFalse(new VersionLevelRange(1, 4).isCompatibleWith(new 
VersionRange(2, 4)));
+        assertFalse(new VersionLevelRange(2, 4).isCompatibleWith(new 
VersionRange(2, 3)));
+    }
+
+    @Test
+    public void testGetters() {
+        assertEquals(1, new VersionLevelRange(1, 2).min());
+        assertEquals(2, new VersionLevelRange(1, 2).max());
+    }
+}

Review comment:
       Done.

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##########
@@ -0,0 +1,93 @@
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, VersionLevelRange}
+
+// Raised whenever there was an error in updating the FinalizedFeatureCache 
with features.
+class FeatureCacheUpdateException(message: String) extends 
RuntimeException(message) {
+}
+
+// Helper class that represents finalized features along with an epoch value.
+case class FinalizedFeaturesAndEpoch(features: Features[VersionLevelRange], 
epoch: Int) {
+
+  def isValid(newEpoch: Int): Boolean = {
+    newEpoch >= epoch
+  }
+
+  override def toString(): String = {
+    "FinalizedFeaturesAndEpoch(features=%s, epoch=%s)".format(features, epoch)
+  }
+}
+
+/**
+ * A mutable cache containing the latest finalized features and epoch. This 
cache is populated by a
+ * FinalizedFeatureChangeListener.
+ *
+ * Currently the main reader of this cache is the read path that serves an 
ApiVersionsRequest
+ * returning the features information in the response. In the future, as the 
feature versioning
+ * system in KIP-584 is used more widely, this cache could be read by other 
read paths trying to
+ * learn the finalized feature information.
+ */
+object FinalizedFeatureCache extends Logging {
+  @volatile private var featuresAndEpoch: Option[FinalizedFeaturesAndEpoch] = 
Option.empty
+
+  /**
+   * @return   the latest known FinalizedFeaturesAndEpoch. If the returned 
value is empty, it means
+   *           no FinalizedFeaturesAndEpoch exists in the cache at the time 
when this
+   *           method is invoked. This result could change in the future 
whenever the
+   *           updateOrThrow method is invoked.
+   */
+  def get: Option[FinalizedFeaturesAndEpoch] = {
+    featuresAndEpoch
+  }
+
+  def empty: Boolean = {
+    featuresAndEpoch.isEmpty
+  }
+
+  /**
+   * Clears all existing finalized features and epoch from the cache.
+   */
+  def clear(): Unit = {
+    featuresAndEpoch = Option.empty
+    info("Cleared cache")
+  }
+
+  /**
+   * Updates the cache to the latestFeatures, and updates the existing epoch 
to latestEpoch.
+   * Raises an exception when the operation is not successful.
+   *
+   * @param latestFeatures   the latest finalized features to be set in the 
cache
+   * @param latestEpoch      the latest epoch value to be set in the cache
+   *
+   * @throws                 FeatureCacheUpdateException if the cache update 
operation fails
+   *                         due to invalid parameters or incompatibilities 
with the broker's
+   *                         supported features. In such a case, the existing 
cache contents are
+   *                         not modified.
+   */
+  def updateOrThrow(latestFeatures: Features[VersionLevelRange], latestEpoch: 
Int): Unit = {
+    updateOrThrow(FinalizedFeaturesAndEpoch(latestFeatures, latestEpoch))
+  }
+
+  private def updateOrThrow(latest: FinalizedFeaturesAndEpoch): Unit = {
+    val existingStr = featuresAndEpoch.map(existing => 
existing.toString).getOrElse("<empty>")

Review comment:
       Done.

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##########
@@ -0,0 +1,93 @@
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, VersionLevelRange}
+
+// Raised whenever there was an error in updating the FinalizedFeatureCache 
with features.
+class FeatureCacheUpdateException(message: String) extends 
RuntimeException(message) {
+}
+
+// Helper class that represents finalized features along with an epoch value.
+case class FinalizedFeaturesAndEpoch(features: Features[VersionLevelRange], 
epoch: Int) {
+
+  def isValid(newEpoch: Int): Boolean = {
+    newEpoch >= epoch
+  }
+
+  override def toString(): String = {
+    "FinalizedFeaturesAndEpoch(features=%s, epoch=%s)".format(features, epoch)
+  }
+}
+
+/**
+ * A mutable cache containing the latest finalized features and epoch. This 
cache is populated by a
+ * FinalizedFeatureChangeListener.

Review comment:
       Done.

##########
File path: core/src/main/scala/kafka/zk/ZkData.scala
##########
@@ -81,17 +83,27 @@ object BrokerIdsZNode {
 object BrokerInfo {
 
   /**
-   * Create a broker info with v4 json format (which includes multiple 
endpoints and rack) if
-   * the apiVersion is 0.10.0.X or above. Register the broker with v2 json 
format otherwise.
+   * - Create a broker info with v5 json format if the apiVersion is 2.6.x or 
above.
+   * - Create a broker info with v4 json format (which includes multiple 
endpoints and rack) if
+   *   the apiVersion is 0.10.0.X or above but lesser than 2.6.x.
+   * - Register the broker with v2 json format otherwise.
    *
    * Due to KAFKA-3100, 0.9.0.0 broker and old clients will break if JSON 
version is above 2.
    *
-   * We include v2 to make it possible for the broker to migrate from 0.9.0.0 
to 0.10.0.X or above without having to
-   * upgrade to 0.9.0.1 first (clients have to be upgraded to 0.9.0.1 in any 
case).
+   * We include v2 to make it possible for the broker to migrate from 0.9.0.0 
to 0.10.0.X or above
+   * without having to upgrade to 0.9.0.1 first (clients have to be upgraded 
to 0.9.0.1 in
+   * any case).
    */
   def apply(broker: Broker, apiVersion: ApiVersion, jmxPort: Int): BrokerInfo 
= {
-    // see method documentation for the reason why we do this
-    val version = if (apiVersion >= KAFKA_0_10_0_IV1) 4 else 2
+    val version = {

Review comment:
       Done. Good point!

##########
File path: core/src/main/scala/kafka/server/SupportedFeatures.scala
##########
@@ -0,0 +1,70 @@
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, VersionRange, 
VersionLevelRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A common object used in the Broker to define the latest features supported 
by the Broker.
+ * Also provides API to check for incompatibilities between the latest 
features supported by the
+ * Broker and cluster-wide finalized features.
+ */
+object SupportedFeatures extends Logging {
+  /**
+   * This is the latest features supported by the Broker.
+   * This is currently empty, but in the future as we define supported 
features, this map should be
+   * populated.
+   */
+  @volatile private var supportedFeatures = emptySupportedFeatures
+
+  /**
+   * Returns the latest features supported by the Broker.
+   */
+  def get: Features[VersionRange] = {
+    supportedFeatures
+  }
+
+  // Should be used only for testing.
+  def update(newFeatures: Features[VersionRange]): Unit = {
+    supportedFeatures = newFeatures
+  }
+
+  // Should be used only for testing.
+  def clear(): Unit = {
+    supportedFeatures = emptySupportedFeatures
+  }
+
+  /**
+   * Returns the set of feature names found to be incompatible between the 
latest features supported
+   * by the Broker, and the provided cluster-wide finalized features.
+   *
+   * @param finalized   The finalized features against which incompatibilities 
need to be checked for.
+   *
+   * @return            The set of incompatible feature names. If the returned 
set is empty, it
+   *                    means there were no feature incompatibilities found.
+   */
+  def incompatibleFeatures(finalized: Features[VersionLevelRange]): 
Set[String] = {
+    val supported = get

Review comment:
       Done.

##########
File path: core/src/main/scala/kafka/server/SupportedFeatures.scala
##########
@@ -0,0 +1,70 @@
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, VersionRange, 
VersionLevelRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A common object used in the Broker to define the latest features supported 
by the Broker.
+ * Also provides API to check for incompatibilities between the latest 
features supported by the
+ * Broker and cluster-wide finalized features.
+ */
+object SupportedFeatures extends Logging {

Review comment:
       Done.

##########
File path: core/src/main/scala/kafka/server/SupportedFeatures.scala
##########
@@ -0,0 +1,70 @@
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, VersionRange, 
VersionLevelRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A common object used in the Broker to define the latest features supported 
by the Broker.
+ * Also provides API to check for incompatibilities between the latest 
features supported by the
+ * Broker and cluster-wide finalized features.
+ */
+object SupportedFeatures extends Logging {
+  /**
+   * This is the latest features supported by the Broker.
+   * This is currently empty, but in the future as we define supported 
features, this map should be
+   * populated.
+   */
+  @volatile private var supportedFeatures = emptySupportedFeatures
+
+  /**
+   * Returns the latest features supported by the Broker.
+   */
+  def get: Features[VersionRange] = {
+    supportedFeatures
+  }
+
+  // Should be used only for testing.
+  def update(newFeatures: Features[VersionRange]): Unit = {
+    supportedFeatures = newFeatures
+  }
+
+  // Should be used only for testing.
+  def clear(): Unit = {
+    supportedFeatures = emptySupportedFeatures
+  }
+
+  /**
+   * Returns the set of feature names found to be incompatible between the 
latest features supported
+   * by the Broker, and the provided cluster-wide finalized features.
+   *
+   * @param finalized   The finalized features against which incompatibilities 
need to be checked for.

Review comment:
       Good point. I have improved the doc now. Let me know how you feel about 
it.

##########
File path: core/src/main/scala/kafka/zk/KafkaZkClient.scala
##########
@@ -1567,6 +1567,36 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
     createRecursive(path, data = null, throwIfPathExists = false)
   }
 
+  // Visible for testing.
+  def createFeatureZNode(nodeContents: FeatureZNode): Unit = {

Review comment:
       Yes, this will get used in the future. For example the write path will 
use it.

##########
File path: core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
##########
@@ -149,6 +153,53 @@ class BrokerEndPointTest {
     assertEquals(None, broker.rack)
   }
 
+  @Test
+  def testFromJsonV5(): Unit = {

Review comment:
       In my understanding, this is an impossible case. Because, we always 
write features into the JSON only in v5 or above. That is why, there is no test 
for it. Let me know how you feel about it.

##########
File path: core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
##########
@@ -149,6 +153,53 @@ class BrokerEndPointTest {
     assertEquals(None, broker.rack)
   }
 
+  @Test
+  def testFromJsonV5(): Unit = {
+    val json = """{
+      "version":5,
+      "host":"localhost",
+      "port":9092,
+      "jmx_port":9999,
+      "timestamp":"2233345666",
+      "endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"],
+      "listener_security_protocol_map":{"CLIENT":"SSL", 
"REPLICATION":"PLAINTEXT"},
+      "rack":"dc1",
+      "features": {"feature1": {"min_version": 1, "max_version": 2}, 
"feature2": {"min_version": 2, "max_version": 4}}
+    }"""
+    val broker = parseBrokerJson(1, json)
+    assertEquals(1, broker.id)
+    val brokerEndPoint = broker.brokerEndPoint(new ListenerName("CLIENT"))
+    assertEquals("host1", brokerEndPoint.host)
+    assertEquals(9092, brokerEndPoint.port)
+    assertEquals(Some("dc1"), broker.rack)
+    assertEquals(Features.supportedFeatures(
+      Map[String, VersionRange](
+        "feature1" -> new VersionRange(1, 2),
+        "feature2" -> new VersionRange(2, 4)).asJava),
+      broker.features)
+  }
+
+  @Test
+  def testFromJsonV4WithNoFeatures(): Unit = {

Review comment:
       Done.

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##########
@@ -0,0 +1,93 @@
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, VersionLevelRange}
+
+// Raised whenever there was an error in updating the FinalizedFeatureCache 
with features.
+class FeatureCacheUpdateException(message: String) extends 
RuntimeException(message) {
+}
+
+// Helper class that represents finalized features along with an epoch value.
+case class FinalizedFeaturesAndEpoch(features: Features[VersionLevelRange], 
epoch: Int) {
+
+  def isValid(newEpoch: Int): Boolean = {
+    newEpoch >= epoch
+  }
+
+  override def toString(): String = {
+    "FinalizedFeaturesAndEpoch(features=%s, epoch=%s)".format(features, epoch)
+  }
+}
+
+/**
+ * A mutable cache containing the latest finalized features and epoch. This 
cache is populated by a
+ * FinalizedFeatureChangeListener.
+ *
+ * Currently the main reader of this cache is the read path that serves an 
ApiVersionsRequest
+ * returning the features information in the response. In the future, as the 
feature versioning
+ * system in KIP-584 is used more widely, this cache could be read by other 
read paths trying to
+ * learn the finalized feature information.
+ */
+object FinalizedFeatureCache extends Logging {
+  @volatile private var featuresAndEpoch: Option[FinalizedFeaturesAndEpoch] = 
Option.empty
+
+  /**
+   * @return   the latest known FinalizedFeaturesAndEpoch. If the returned 
value is empty, it means
+   *           no FinalizedFeaturesAndEpoch exists in the cache at the time 
when this
+   *           method is invoked. This result could change in the future 
whenever the
+   *           updateOrThrow method is invoked.
+   */
+  def get: Option[FinalizedFeaturesAndEpoch] = {
+    featuresAndEpoch
+  }
+
+  def empty: Boolean = {
+    featuresAndEpoch.isEmpty
+  }
+
+  /**
+   * Clears all existing finalized features and epoch from the cache.
+   */
+  def clear(): Unit = {
+    featuresAndEpoch = Option.empty
+    info("Cleared cache")
+  }
+
+  /**
+   * Updates the cache to the latestFeatures, and updates the existing epoch 
to latestEpoch.
+   * Raises an exception when the operation is not successful.
+   *
+   * @param latestFeatures   the latest finalized features to be set in the 
cache
+   * @param latestEpoch      the latest epoch value to be set in the cache
+   *
+   * @throws                 FeatureCacheUpdateException if the cache update 
operation fails
+   *                         due to invalid parameters or incompatibilities 
with the broker's
+   *                         supported features. In such a case, the existing 
cache contents are
+   *                         not modified.
+   */
+  def updateOrThrow(latestFeatures: Features[VersionLevelRange], latestEpoch: 
Int): Unit = {
+    updateOrThrow(FinalizedFeaturesAndEpoch(latestFeatures, latestEpoch))
+  }
+
+  private def updateOrThrow(latest: FinalizedFeaturesAndEpoch): Unit = {
+    val existingStr = featuresAndEpoch.map(existing => 
existing.toString).getOrElse("<empty>")
+    if (!featuresAndEpoch.isEmpty && featuresAndEpoch.get.epoch > 
latest.epoch) {
+      val errorMsg = ("FinalizedFeatureCache update failed due to invalid 
epoch in new finalized %s." +
+        " The existing finalized is %s").format(latest, existingStr)
+      throw new FeatureCacheUpdateException(errorMsg)
+    } else {
+      val incompatibleFeatures = 
SupportedFeatures.incompatibleFeatures(latest.features)
+      if (incompatibleFeatures.nonEmpty) {
+        val errorMsg = ("FinalizedFeatureCache updated failed since feature 
compatibility" +
+          " checks failed! Supported %s has incompatibilities with the latest 
finalized %s." +
+          " The incompatible features are: %s.").format(
+          SupportedFeatures.get, latest, incompatibleFeatures)
+        throw new FeatureCacheUpdateException(errorMsg)
+      }
+    }
+    val logMsg = "Updated cache from existing finalized %s to latest finalized 
%s".format(

Review comment:
       It is used intentionally to split the log message into 2 lines (for 
~100-char readability limit per line). Otherwise the string will be huge and 
all in the same line.

##########
File path: core/src/main/scala/kafka/zk/ZkData.scala
##########
@@ -225,7 +255,12 @@ object BrokerIdZNode {
           }
 
         val rack = brokerInfo.get(RackKey).flatMap(_.to[Option[String]])
-        BrokerInfo(Broker(id, endpoints, rack), version, jmxPort)
+        val features = FeatureZNode.asJavaMap(brokerInfo

Review comment:
       Done.

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,200 @@
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a 
change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is 
asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized 
through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to propvide 
notification
+   *                            when an update operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, 
maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest 
features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, 
then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method 
can be invoked
+     * only exactly once successfully.
+     */
+    def updateLatestOrThrow(): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        if (notifier.getCount != 1) {
+          throw new IllegalStateException(
+            "Can not notify after updateLatestOrThrow was called more than 
once successfully.")
+        }
+      })
+
+      info(s"Reading feature ZK node at path: $featureZkNodePath")
+      val (mayBeFeatureZNodeBytes, version) = 
zkClient.getDataAndVersion(featureZkNodePath)
+      if (version == ZkVersion.UnknownVersion) {
+        info(s"Feature ZK node at path: $featureZkNodePath does not exist")
+        FinalizedFeatureCache.clear()
+      } else {
+        val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+        if (featureZNode.status == FeatureZNodeStatus.Disabled) {
+          info(s"Feature ZK node at path: $featureZkNodePath is in disabled 
status")
+          FinalizedFeatureCache.clear()
+        } else if(featureZNode.status == FeatureZNodeStatus.Enabled) {
+          FinalizedFeatureCache.updateOrThrow(featureZNode.features, version)
+        } else {
+          throw new IllegalStateException(s"Unexpected FeatureZNodeStatus 
found in $featureZNode")
+        }
+      }
+
+      maybeNotifyOnce.foreach(notifier => notifier.countDown())
+    }
+
+    /**
+     * Waits until at least a single updateLatestOrThrow completes 
successfully.
+     * NOTE: The method returns immediately if an updateLatestOrThrow call has 
already completed
+     * successfully.
+     *
+     * @param waitTimeMs   the timeout for the wait operation
+     *
+     * @throws             - RuntimeException if the thread was interrupted 
during wait
+     *                     - TimeoutException if the wait can not be completed 
in waitTimeMs
+     *                       milli seconds
+     */
+    def awaitUpdateOrThrow(waitTimeMs: Long): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        var success = false
+        try {
+          success = notifier.await(waitTimeMs, TimeUnit.MILLISECONDS)
+        } catch {
+          case e: InterruptedException =>
+            throw new RuntimeException(
+              "Unable to wait for FinalizedFeatureCache update to finish.", e)
+        }
+
+        if (!success) {
+          throw new TimeoutException(
+            s"Timed out after waiting for ${waitTimeMs}ms for FeatureCache to 
be updated.")
+        }
+      })
+    }
+  }
+
+  /**
+   * A shutdownable thread to process feature node change notifications that 
are populated into the
+   * queue. If any change notification can not be processed successfully 
(unless it is due to an
+   * interrupt), the thread treats it as a fatal event and triggers Broker 
exit.
+   *
+   * @param name   name of the thread
+   */
+  private class ChangeNotificationProcessorThread(name: String) extends 
ShutdownableThread(name = name) {
+    override def doWork(): Unit = {
+      try {
+        queue.take.updateLatestOrThrow()
+      } catch {
+        case e: InterruptedException => info(s"Interrupted", e)
+        case e: Exception => {
+          error("Failed to process feature ZK node change event. The broker 
will exit.", e)
+          throw new FatalExitError(1)
+        }
+      }
+    }
+  }
+
+  // Feature ZK node change handler.
+  object FeatureZNodeChangeHandler extends ZNodeChangeHandler {
+    override val path: String = FeatureZNode.path
+
+    private def processNotification(): Unit = {
+      queue.add(new FeatureCacheUpdater(path))
+    }
+
+    override def handleCreation(): Unit = {
+      info(s"Feature ZK node created at path: $path")
+      processNotification()
+    }
+
+    override def handleDataChange(): Unit = {
+      info(s"Feature ZK node updated at path: $path")
+      processNotification()
+    }
+
+    override def handleDeletion(): Unit = {
+      warn(s"Feature ZK node deleted at path: $path")
+      processNotification()

Review comment:
       I have added comments now to the code.
   
   The idea I had was that this event may happen, rarely (ex: operational 
error).
   In such a case, we do not want to kill the brokers, so we just log a warning 
and treat the case as if the node is absent, and populate the cache with empty 
features.
   
   So, this case is actually handled inside 
`FeatureCacheUpdater.updateLatestOrThrow()`.
   The call to read ZK node will return `ZkVersion.UnknownVersion` whenever the 
node does not exist in ZK, and I've explicitly handled this returned version.

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##########
@@ -0,0 +1,93 @@
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, VersionLevelRange}
+
+// Raised whenever there was an error in updating the FinalizedFeatureCache 
with features.

Review comment:
       As we discussed over slack today, this exception is already handled in 
`ChangeNotificationProcessorThread.doWork()` method defined in 
`FinalizedFeatureChangeListener.scala`. Basically, the ZK change notification 
processor thread exits the Broker with a fatal error (non-zero exit code) when 
this exception (or any exception) is caught while trying to update 
`FinalizedFeatureCache`.

##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -210,6 +215,14 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
         /* setup zookeeper */
         initZkClient(time)
 
+        /* initialize features */
+        _featureChangeListener = new FinalizedFeatureChangeListener(_zkClient)
+        if (config.interBrokerProtocolVersion >= KAFKA_2_6_IV1) {
+          // The feature versioning system (KIP-584) is active only when:
+          // config.interBrokerProtocolVersion is >= KAFKA_2_6_IV1.
+          _featureChangeListener.initOrThrow(60000)

Review comment:
       Done.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/feature/VersionLevelRange.java
##########
@@ -0,0 +1,39 @@
+package org.apache.kafka.common.feature;
+
+import java.util.Map;
+
+/**
+ * A specialization of VersionRange representing a range of version levels. 
The main specialization
+ * is that the class uses different serialization keys for min/max attributes.
+ *
+ * NOTE: This is the backing class used to define the min/max version levels 
for finalized features.
+ */
+public class VersionLevelRange extends VersionRange {

Review comment:
       Done. Good point!
    - I have now created 3 classes as you proposed. `BaseVersionRange` is the 
base class, and, `SupportedVersionRange` & `FinalizedVersionRange` are it's 
child classes.
    - The key labels couldn't be made into abstract functions since these 
constants are needed within `deserialize()` which is a static method defined in 
the child classes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to