abbccdda commented on a change in pull request #8680:
URL: https://github.com/apache/kafka/pull/8680#discussion_r426884892



##########
File path: 
clients/src/main/java/org/apache/kafka/common/feature/VersionLevelRange.java
##########
@@ -0,0 +1,39 @@
+package org.apache.kafka.common.feature;
+
+import java.util.Map;
+
+/**
+ * A specialization of VersionRange representing a range of version levels. 
The main specialization
+ * is that the class uses different serialization keys for min/max attributes.
+ *
+ * NOTE: This is the backing class used to define the min/max version levels 
for finalized features.
+ */
+public class VersionLevelRange extends VersionRange {

Review comment:
       In terms of naming, do you think `FinalizedVersionRange` is more 
explicit? Also when I look closer at the class hierarchy, I feel the sharing 
point between finalized version range and supported version range should be 
extracted to avoid weird inheritance. What I'm proposing is to have 
`VersionRange` as a super class with two subclasses: `SupportedVersionRange` 
and `FinalizedVersionRange`, and make `minKeyLabel` and `maxKeyLabel` abstract 
functions, WDYT?

##########
File path: 
clients/src/test/java/org/apache/kafka/common/feature/VersionRangeTest.java
##########
@@ -0,0 +1,150 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+public class VersionRangeTest {
+    @Test
+    public void testFailDueToInvalidParams() {
+        // min and max can't be < 1.
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new VersionRange(0, 0));
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new VersionRange(-1, -1));
+        // min can't be < 1.
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new VersionRange(0, 1));
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new VersionRange(-1, 1));
+        // max can't be < 1.
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new VersionRange(1, 0));
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new VersionRange(1, -1));
+        // min can't be > max.
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new VersionRange(2, 1));
+    }
+
+    @Test
+    public void testSerializeDeserializeTest() {
+        VersionRange versionRange = new VersionRange(1, 2);
+        assertEquals(1, versionRange.min());
+        assertEquals(2, versionRange.max());
+
+        Map<String, Long> serialized = versionRange.serialize();
+        assertEquals(
+            new HashMap<String, Long>() {
+                {
+                    put("min_version", versionRange.min());
+                    put("max_version", versionRange.max());
+                }
+            },
+            serialized
+        );
+
+        VersionRange deserialized = VersionRange.deserialize(serialized);
+        assertEquals(1, deserialized.min());
+        assertEquals(2, deserialized.max());
+        assertEquals(versionRange, deserialized);
+    }
+
+    @Test
+    public void testDeserializationFailureTest() {
+        // min_version can't be < 1.
+        Map<String, Long> invalidWithBadMinVersion = new HashMap<String, 
Long>() {
+            {
+                put("min_version", 0L);
+                put("max_version", 1L);
+            }
+        };
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> VersionRange.deserialize(invalidWithBadMinVersion));
+
+        // max_version can't be < 1.
+        Map<String, Long> invalidWithBadMaxVersion = new HashMap<String, 
Long>() {
+            {
+                put("min_version", 1L);
+                put("max_version", 0L);
+            }
+        };
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> VersionRange.deserialize(invalidWithBadMaxVersion));
+
+        // min_version and max_version can't be < 1.
+        Map<String, Long> invalidWithBadMinMaxVersion = new HashMap<String, 
Long>() {
+            {
+                put("min_version", 0L);
+                put("max_version", 0L);
+            }
+        };
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> VersionRange.deserialize(invalidWithBadMinMaxVersion));
+
+        // min_version can't be > max_version.
+        Map<String, Long> invalidWithLowerMaxVersion = new HashMap<String, 
Long>() {
+            {
+                put("min_version", 2L);
+                put("max_version", 1L);
+            }
+        };
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> VersionRange.deserialize(invalidWithLowerMaxVersion));
+
+        // min_version key missing.
+        Map<String, Long> invalidWithMinKeyMissing = new HashMap<String, 
Long>() {
+            {
+                put("max_version", 1L);
+            }
+        };
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> VersionRange.deserialize(invalidWithMinKeyMissing));
+
+        // max_version key missing.
+        Map<String, Long> invalidWithMaxKeyMissing = new HashMap<String, 
Long>() {
+            {
+                put("min_version", 1L);
+            }
+        };
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> VersionRange.deserialize(invalidWithMaxKeyMissing));
+    }
+
+    @Test
+    public void testToString() {
+        assertEquals("VersionRange[1, 1]", new VersionRange(1, 1).toString());
+        assertEquals("VersionRange[1, 2]", new VersionRange(1, 2).toString());
+    }
+
+    @Test
+    public void testEquals() {
+        assertTrue(new VersionRange(1, 1).equals(new VersionRange(1, 1)));
+        assertFalse(new VersionRange(1, 1).equals(new VersionRange(1, 2)));
+    }
+
+    @Test
+    public void testGetters() {

Review comment:
       nit: testMinMax, and we could reuse the same `new VersionRange(1, 2)` by 
only creating it once. 

##########
File path: 
clients/src/main/java/org/apache/kafka/common/feature/VersionRange.java
##########
@@ -0,0 +1,104 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Represents an immutable basic version range using 2 attributes: min and max 
of type long.
+ * The min and max attributes are expected to be >= 1, and with max >= min.
+ *
+ * The class also provides API to serialize/deserialize the version range 
to/from a map.
+ * The class allows for configurable labels for the min/max attributes, which 
can be specialized by
+ * sub-classes (if needed).
+ *
+ * NOTE: This is the backing class used to define the min/max versions for 
supported features.
+ */
+public class VersionRange {
+    // Label for the min version key, that's used only for 
serialization/deserialization purposes.
+    private static final String MIN_VERSION_KEY_LABEL = "min_version";
+
+    // Label for the max version key, that's used only for 
serialization/deserialization purposes.
+    private static final String MAX_VERSION_KEY_LABEL = "max_version";
+
+    private final String minKeyLabel;
+
+    private final long minValue;
+
+    private final String maxKeyLabel;
+
+    private final long maxValue;
+
+    protected VersionRange(String minKey, long minValue, String maxKeyLabel, 
long maxValue) {
+        if (minValue < 1 || maxValue < 1 || maxValue < minValue) {
+            throw new IllegalArgumentException(
+                String.format(
+                    "Expected minValue > 1, maxValue > 1 and maxValue >= 
minValue, but received" +
+                    " minValue: %d, maxValue: %d", minValue, maxValue));
+        }
+        this.minKeyLabel = minKey;
+        this.minValue = minValue;
+        this.maxKeyLabel = maxKeyLabel;
+        this.maxValue = maxValue;
+    }
+
+    public VersionRange(long minVersion, long maxVersion) {
+        this(MIN_VERSION_KEY_LABEL, minVersion, MAX_VERSION_KEY_LABEL, 
maxVersion);
+    }
+
+    public long min() {
+        return minValue;
+    }
+
+    public long max() {
+        return maxValue;
+    }
+
+    public String toString() {
+        return String.format("%s[%d, %d]", this.getClass().getSimpleName(), 
min(), max());
+    }
+
+    public Map<String, Long> serialize() {
+        return new HashMap<String, Long>() {
+            {
+                put(minKeyLabel, min());
+                put(maxKeyLabel, max());
+            }
+        };
+    }
+
+    public static VersionRange deserialize(Map<String, Long> serialized) {
+        return new VersionRange(
+            valueOrThrow(MIN_VERSION_KEY_LABEL, serialized),
+            valueOrThrow(MAX_VERSION_KEY_LABEL, serialized));
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+        if (!(other instanceof VersionRange)) {

Review comment:
       Need to check null

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##########
@@ -0,0 +1,93 @@
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, VersionLevelRange}
+
+// Raised whenever there was an error in updating the FinalizedFeatureCache 
with features.
+class FeatureCacheUpdateException(message: String) extends 
RuntimeException(message) {
+}
+
+// Helper class that represents finalized features along with an epoch value.
+case class FinalizedFeaturesAndEpoch(features: Features[VersionLevelRange], 
epoch: Int) {
+
+  def isValid(newEpoch: Int): Boolean = {
+    newEpoch >= epoch
+  }
+
+  override def toString(): String = {
+    "FinalizedFeaturesAndEpoch(features=%s, epoch=%s)".format(features, epoch)
+  }
+}
+
+/**
+ * A mutable cache containing the latest finalized features and epoch. This 
cache is populated by a
+ * FinalizedFeatureChangeListener.

Review comment:
       Could we add a reference to the class?

##########
File path: core/src/main/scala/kafka/zk/ZkData.scala
##########
@@ -81,17 +83,27 @@ object BrokerIdsZNode {
 object BrokerInfo {
 
   /**
-   * Create a broker info with v4 json format (which includes multiple 
endpoints and rack) if
-   * the apiVersion is 0.10.0.X or above. Register the broker with v2 json 
format otherwise.
+   * - Create a broker info with v5 json format if the apiVersion is 2.6.x or 
above.
+   * - Create a broker info with v4 json format (which includes multiple 
endpoints and rack) if
+   *   the apiVersion is 0.10.0.X or above but lesser than 2.6.x.
+   * - Register the broker with v2 json format otherwise.
    *
    * Due to KAFKA-3100, 0.9.0.0 broker and old clients will break if JSON 
version is above 2.
    *
-   * We include v2 to make it possible for the broker to migrate from 0.9.0.0 
to 0.10.0.X or above without having to
-   * upgrade to 0.9.0.1 first (clients have to be upgraded to 0.9.0.1 in any 
case).
+   * We include v2 to make it possible for the broker to migrate from 0.9.0.0 
to 0.10.0.X or above
+   * without having to upgrade to 0.9.0.1 first (clients have to be upgraded 
to 0.9.0.1 in
+   * any case).
    */
   def apply(broker: Broker, apiVersion: ApiVersion, jmxPort: Int): BrokerInfo 
= {
-    // see method documentation for the reason why we do this
-    val version = if (apiVersion >= KAFKA_0_10_0_IV1) 4 else 2
+    val version = {

Review comment:
       I don't think we need a nested if-else:
   ```
    val version = {
         if (apiVersion >= KAFKA_2_6_IV1) 
           5
         else if (apiVersion >= KAFKA_0_10_0_IV1)
           4
         else
           2
       }
   ```

##########
File path: core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
##########
@@ -149,6 +153,53 @@ class BrokerEndPointTest {
     assertEquals(None, broker.rack)
   }
 
+  @Test
+  def testFromJsonV5(): Unit = {
+    val json = """{
+      "version":5,
+      "host":"localhost",
+      "port":9092,
+      "jmx_port":9999,
+      "timestamp":"2233345666",
+      "endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"],
+      "listener_security_protocol_map":{"CLIENT":"SSL", 
"REPLICATION":"PLAINTEXT"},
+      "rack":"dc1",
+      "features": {"feature1": {"min_version": 1, "max_version": 2}, 
"feature2": {"min_version": 2, "max_version": 4}}
+    }"""
+    val broker = parseBrokerJson(1, json)
+    assertEquals(1, broker.id)
+    val brokerEndPoint = broker.brokerEndPoint(new ListenerName("CLIENT"))
+    assertEquals("host1", brokerEndPoint.host)
+    assertEquals(9092, brokerEndPoint.port)
+    assertEquals(Some("dc1"), broker.rack)
+    assertEquals(Features.supportedFeatures(
+      Map[String, VersionRange](
+        "feature1" -> new VersionRange(1, 2),
+        "feature2" -> new VersionRange(2, 4)).asJava),
+      broker.features)
+  }
+
+  @Test
+  def testFromJsonV4WithNoFeatures(): Unit = {

Review comment:
       nit: This test could move closer to testFromJsonV4WithNoRack

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##########
@@ -0,0 +1,93 @@
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, VersionLevelRange}
+
+// Raised whenever there was an error in updating the FinalizedFeatureCache 
with features.
+class FeatureCacheUpdateException(message: String) extends 
RuntimeException(message) {
+}
+
+// Helper class that represents finalized features along with an epoch value.
+case class FinalizedFeaturesAndEpoch(features: Features[VersionLevelRange], 
epoch: Int) {
+
+  def isValid(newEpoch: Int): Boolean = {
+    newEpoch >= epoch
+  }
+
+  override def toString(): String = {
+    "FinalizedFeaturesAndEpoch(features=%s, epoch=%s)".format(features, epoch)
+  }
+}
+
+/**
+ * A mutable cache containing the latest finalized features and epoch. This 
cache is populated by a
+ * FinalizedFeatureChangeListener.
+ *
+ * Currently the main reader of this cache is the read path that serves an 
ApiVersionsRequest
+ * returning the features information in the response. In the future, as the 
feature versioning
+ * system in KIP-584 is used more widely, this cache could be read by other 
read paths trying to
+ * learn the finalized feature information.
+ */
+object FinalizedFeatureCache extends Logging {
+  @volatile private var featuresAndEpoch: Option[FinalizedFeaturesAndEpoch] = 
Option.empty
+
+  /**
+   * @return   the latest known FinalizedFeaturesAndEpoch. If the returned 
value is empty, it means
+   *           no FinalizedFeaturesAndEpoch exists in the cache at the time 
when this
+   *           method is invoked. This result could change in the future 
whenever the
+   *           updateOrThrow method is invoked.
+   */
+  def get: Option[FinalizedFeaturesAndEpoch] = {
+    featuresAndEpoch
+  }
+
+  def empty: Boolean = {
+    featuresAndEpoch.isEmpty
+  }
+
+  /**
+   * Clears all existing finalized features and epoch from the cache.
+   */
+  def clear(): Unit = {
+    featuresAndEpoch = Option.empty
+    info("Cleared cache")
+  }
+
+  /**
+   * Updates the cache to the latestFeatures, and updates the existing epoch 
to latestEpoch.
+   * Raises an exception when the operation is not successful.
+   *
+   * @param latestFeatures   the latest finalized features to be set in the 
cache
+   * @param latestEpoch      the latest epoch value to be set in the cache
+   *
+   * @throws                 FeatureCacheUpdateException if the cache update 
operation fails
+   *                         due to invalid parameters or incompatibilities 
with the broker's
+   *                         supported features. In such a case, the existing 
cache contents are
+   *                         not modified.
+   */
+  def updateOrThrow(latestFeatures: Features[VersionLevelRange], latestEpoch: 
Int): Unit = {
+    updateOrThrow(FinalizedFeaturesAndEpoch(latestFeatures, latestEpoch))
+  }
+
+  private def updateOrThrow(latest: FinalizedFeaturesAndEpoch): Unit = {
+    val existingStr = featuresAndEpoch.map(existing => 
existing.toString).getOrElse("<empty>")
+    if (!featuresAndEpoch.isEmpty && featuresAndEpoch.get.epoch > 
latest.epoch) {
+      val errorMsg = ("FinalizedFeatureCache update failed due to invalid 
epoch in new finalized %s." +
+        " The existing finalized is %s").format(latest, existingStr)
+      throw new FeatureCacheUpdateException(errorMsg)
+    } else {
+      val incompatibleFeatures = 
SupportedFeatures.incompatibleFeatures(latest.features)
+      if (incompatibleFeatures.nonEmpty) {
+        val errorMsg = ("FinalizedFeatureCache updated failed since feature 
compatibility" +

Review comment:
       nit: this errorMsg val seems redundant.

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,200 @@
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a 
change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is 
asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized 
through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to propvide 
notification
+   *                            when an update operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, 
maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest 
features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, 
then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method 
can be invoked
+     * only exactly once successfully.
+     */
+    def updateLatestOrThrow(): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        if (notifier.getCount != 1) {
+          throw new IllegalStateException(
+            "Can not notify after updateLatestOrThrow was called more than 
once successfully.")
+        }
+      })
+
+      info(s"Reading feature ZK node at path: $featureZkNodePath")
+      val (mayBeFeatureZNodeBytes, version) = 
zkClient.getDataAndVersion(featureZkNodePath)
+      if (version == ZkVersion.UnknownVersion) {
+        info(s"Feature ZK node at path: $featureZkNodePath does not exist")
+        FinalizedFeatureCache.clear()
+      } else {
+        val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+        if (featureZNode.status == FeatureZNodeStatus.Disabled) {
+          info(s"Feature ZK node at path: $featureZkNodePath is in disabled 
status")
+          FinalizedFeatureCache.clear()
+        } else if(featureZNode.status == FeatureZNodeStatus.Enabled) {
+          FinalizedFeatureCache.updateOrThrow(featureZNode.features, version)
+        } else {
+          throw new IllegalStateException(s"Unexpected FeatureZNodeStatus 
found in $featureZNode")
+        }
+      }
+
+      maybeNotifyOnce.foreach(notifier => notifier.countDown())
+    }
+
+    /**
+     * Waits until at least a single updateLatestOrThrow completes 
successfully.
+     * NOTE: The method returns immediately if an updateLatestOrThrow call has 
already completed
+     * successfully.
+     *
+     * @param waitTimeMs   the timeout for the wait operation
+     *
+     * @throws             - RuntimeException if the thread was interrupted 
during wait
+     *                     - TimeoutException if the wait can not be completed 
in waitTimeMs
+     *                       milli seconds
+     */
+    def awaitUpdateOrThrow(waitTimeMs: Long): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        var success = false
+        try {
+          success = notifier.await(waitTimeMs, TimeUnit.MILLISECONDS)
+        } catch {
+          case e: InterruptedException =>
+            throw new RuntimeException(
+              "Unable to wait for FinalizedFeatureCache update to finish.", e)
+        }
+
+        if (!success) {
+          throw new TimeoutException(
+            s"Timed out after waiting for ${waitTimeMs}ms for FeatureCache to 
be updated.")
+        }
+      })
+    }
+  }
+
+  /**
+   * A shutdownable thread to process feature node change notifications that 
are populated into the
+   * queue. If any change notification can not be processed successfully 
(unless it is due to an
+   * interrupt), the thread treats it as a fatal event and triggers Broker 
exit.
+   *
+   * @param name   name of the thread
+   */
+  private class ChangeNotificationProcessorThread(name: String) extends 
ShutdownableThread(name = name) {
+    override def doWork(): Unit = {
+      try {
+        queue.take.updateLatestOrThrow()
+      } catch {
+        case e: InterruptedException => info(s"Interrupted", e)
+        case e: Exception => {
+          error("Failed to process feature ZK node change event. The broker 
will exit.", e)
+          throw new FatalExitError(1)
+        }
+      }
+    }
+  }
+
+  // Feature ZK node change handler.
+  object FeatureZNodeChangeHandler extends ZNodeChangeHandler {
+    override val path: String = FeatureZNode.path
+
+    private def processNotification(): Unit = {
+      queue.add(new FeatureCacheUpdater(path))
+    }
+
+    override def handleCreation(): Unit = {
+      info(s"Feature ZK node created at path: $path")
+      processNotification()
+    }
+
+    override def handleDataChange(): Unit = {
+      info(s"Feature ZK node updated at path: $path")
+      processNotification()
+    }
+
+    override def handleDeletion(): Unit = {
+      warn(s"Feature ZK node deleted at path: $path")
+      processNotification()

Review comment:
       Does this event actually happen? Will we hit illegal state exception in 
`updateLatestOrThrow`?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
##########
@@ -113,14 +141,26 @@ public static ApiVersionsResponse fromStruct(Struct 
struct, short version) {
         }
     }
 
-    public static ApiVersionsResponse apiVersionsResponse(int throttleTimeMs, 
byte maxMagic) {
+    public static ApiVersionsResponse apiVersionsResponse(

Review comment:
       Note this function is public, which suggests there could be external 
dependency that we need to take care of. The safer approach is to keep this 
static function and create a separate one with augmented parameters. cc @ijuma 
for validation.

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value 
being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version 
ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the 
important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ */
+public class Features<VersionRangeType extends VersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the 
caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to type of VersionRange, as the 
backing data structure
+     *                   for the Features object.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        this.features = features;
+    }
+
+    /**
+     * @param features   Map of feature name to VersionRange, as the backing 
data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"supported" features.
+     */
+    public static Features<VersionRange> supportedFeatures(Map<String, 
VersionRange> features) {
+        return new Features<VersionRange>(features);
+    }
+
+    /**
+     * @param features   Map of feature name to VersionLevelRange, as the 
backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"finalized" features.
+     */
+    public static Features<VersionLevelRange> finalizedFeatures(Map<String, 
VersionLevelRange> features) {
+        return new Features<VersionLevelRange>(features);
+    }
+
+    public static Features<VersionLevelRange> emptyFinalizedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+    public static Features<VersionRange> emptySupportedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+
+    public Map<String, VersionRangeType> all() {

Review comment:
       I gave it more thought, and wonder whether we could just call this 
function `features` to be more consistent with our convention for getters.

##########
File path: 
clients/src/test/java/org/apache/kafka/common/feature/VersionLevelRangeTest.java
##########
@@ -0,0 +1,162 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class VersionLevelRangeTest {
+
+    @Test
+    public void testCreateFailDueToInvalidParams() {
+        // min and max can't be < 1.

Review comment:
       Does L17-23 really necessary for testing?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/feature/VersionRange.java
##########
@@ -0,0 +1,104 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Represents an immutable basic version range using 2 attributes: min and max 
of type long.
+ * The min and max attributes are expected to be >= 1, and with max >= min.
+ *
+ * The class also provides API to serialize/deserialize the version range 
to/from a map.
+ * The class allows for configurable labels for the min/max attributes, which 
can be specialized by
+ * sub-classes (if needed).
+ *
+ * NOTE: This is the backing class used to define the min/max versions for 
supported features.
+ */
+public class VersionRange {
+    // Label for the min version key, that's used only for 
serialization/deserialization purposes.
+    private static final String MIN_VERSION_KEY_LABEL = "min_version";
+
+    // Label for the max version key, that's used only for 
serialization/deserialization purposes.
+    private static final String MAX_VERSION_KEY_LABEL = "max_version";
+
+    private final String minKeyLabel;
+
+    private final long minValue;
+
+    private final String maxKeyLabel;
+
+    private final long maxValue;
+
+    protected VersionRange(String minKey, long minValue, String maxKeyLabel, 
long maxValue) {
+        if (minValue < 1 || maxValue < 1 || maxValue < minValue) {
+            throw new IllegalArgumentException(
+                String.format(
+                    "Expected minValue > 1, maxValue > 1 and maxValue >= 
minValue, but received" +
+                    " minValue: %d, maxValue: %d", minValue, maxValue));
+        }
+        this.minKeyLabel = minKey;
+        this.minValue = minValue;
+        this.maxKeyLabel = maxKeyLabel;
+        this.maxValue = maxValue;
+    }
+
+    public VersionRange(long minVersion, long maxVersion) {
+        this(MIN_VERSION_KEY_LABEL, minVersion, MAX_VERSION_KEY_LABEL, 
maxVersion);
+    }
+
+    public long min() {
+        return minValue;
+    }
+
+    public long max() {
+        return maxValue;
+    }
+
+    public String toString() {
+        return String.format("%s[%d, %d]", this.getClass().getSimpleName(), 
min(), max());
+    }
+
+    public Map<String, Long> serialize() {
+        return new HashMap<String, Long>() {
+            {
+                put(minKeyLabel, min());
+                put(maxKeyLabel, max());
+            }
+        };
+    }
+
+    public static VersionRange deserialize(Map<String, Long> serialized) {
+        return new VersionRange(
+            valueOrThrow(MIN_VERSION_KEY_LABEL, serialized),
+            valueOrThrow(MAX_VERSION_KEY_LABEL, serialized));
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+        if (!(other instanceof VersionRange)) {
+            return false;
+        }
+
+        final VersionRange that = (VersionRange) other;
+        return Objects.equals(this.minKeyLabel, that.minKeyLabel) &&

Review comment:
       Is there a difference between `Objects.equals` and 
`this.minKeyLabel.equals(that.minKeyLabel)`?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
##########
@@ -77,6 +93,18 @@ public boolean shouldClientThrottle(short version) {
         return version >= 2;
     }
 
+    public SupportedFeatureKey supportedFeature(String featureName) {

Review comment:
       I think we could delay the addition for these helpers until we actually 
need them.

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##########
@@ -0,0 +1,93 @@
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, VersionLevelRange}
+
+// Raised whenever there was an error in updating the FinalizedFeatureCache 
with features.
+class FeatureCacheUpdateException(message: String) extends 
RuntimeException(message) {
+}
+
+// Helper class that represents finalized features along with an epoch value.
+case class FinalizedFeaturesAndEpoch(features: Features[VersionLevelRange], 
epoch: Int) {
+
+  def isValid(newEpoch: Int): Boolean = {

Review comment:
       Is this function being used?

##########
File path: core/src/main/scala/kafka/zk/KafkaZkClient.scala
##########
@@ -1567,6 +1567,36 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
     createRecursive(path, data = null, throwIfPathExists = false)
   }
 
+  // Visible for testing.
+  def createFeatureZNode(nodeContents: FeatureZNode): Unit = {

Review comment:
       Do you expect these helper functions actually to be used in production 
logic with subsequent PRs? 

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##########
@@ -0,0 +1,93 @@
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, VersionLevelRange}
+
+// Raised whenever there was an error in updating the FinalizedFeatureCache 
with features.

Review comment:
       It seems that we don't have the handling logic for this 
FeatureCacheUpdateException. Do we think this is fatal?

##########
File path: core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
##########
@@ -149,6 +153,53 @@ class BrokerEndPointTest {
     assertEquals(None, broker.rack)
   }
 
+  @Test
+  def testFromJsonV5(): Unit = {

Review comment:
       What would happen if we are dealing with a V4 json map containing 
features?

##########
File path: core/src/main/scala/kafka/zk/ZkData.scala
##########
@@ -225,7 +255,12 @@ object BrokerIdZNode {
           }
 
         val rack = brokerInfo.get(RackKey).flatMap(_.to[Option[String]])
-        BrokerInfo(Broker(id, endpoints, rack), version, jmxPort)
+        val features = FeatureZNode.asJavaMap(brokerInfo

Review comment:
       Could we make feature extraction as a helper function?

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##########
@@ -0,0 +1,93 @@
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, VersionLevelRange}
+
+// Raised whenever there was an error in updating the FinalizedFeatureCache 
with features.

Review comment:
       Might worth getting a ticket to define the handling strategy for such 
exception, and in general how `updateOrThrow` will be used. 

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##########
@@ -0,0 +1,93 @@
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, VersionLevelRange}
+
+// Raised whenever there was an error in updating the FinalizedFeatureCache 
with features.
+class FeatureCacheUpdateException(message: String) extends 
RuntimeException(message) {
+}
+
+// Helper class that represents finalized features along with an epoch value.
+case class FinalizedFeaturesAndEpoch(features: Features[VersionLevelRange], 
epoch: Int) {
+
+  def isValid(newEpoch: Int): Boolean = {
+    newEpoch >= epoch
+  }
+
+  override def toString(): String = {
+    "FinalizedFeaturesAndEpoch(features=%s, epoch=%s)".format(features, epoch)
+  }
+}
+
+/**
+ * A mutable cache containing the latest finalized features and epoch. This 
cache is populated by a
+ * FinalizedFeatureChangeListener.
+ *
+ * Currently the main reader of this cache is the read path that serves an 
ApiVersionsRequest
+ * returning the features information in the response. In the future, as the 
feature versioning
+ * system in KIP-584 is used more widely, this cache could be read by other 
read paths trying to
+ * learn the finalized feature information.
+ */
+object FinalizedFeatureCache extends Logging {
+  @volatile private var featuresAndEpoch: Option[FinalizedFeaturesAndEpoch] = 
Option.empty
+
+  /**
+   * @return   the latest known FinalizedFeaturesAndEpoch. If the returned 
value is empty, it means
+   *           no FinalizedFeaturesAndEpoch exists in the cache at the time 
when this
+   *           method is invoked. This result could change in the future 
whenever the
+   *           updateOrThrow method is invoked.
+   */
+  def get: Option[FinalizedFeaturesAndEpoch] = {
+    featuresAndEpoch
+  }
+
+  def empty: Boolean = {
+    featuresAndEpoch.isEmpty
+  }
+
+  /**
+   * Clears all existing finalized features and epoch from the cache.
+   */
+  def clear(): Unit = {
+    featuresAndEpoch = Option.empty
+    info("Cleared cache")
+  }
+
+  /**
+   * Updates the cache to the latestFeatures, and updates the existing epoch 
to latestEpoch.
+   * Raises an exception when the operation is not successful.
+   *
+   * @param latestFeatures   the latest finalized features to be set in the 
cache
+   * @param latestEpoch      the latest epoch value to be set in the cache
+   *
+   * @throws                 FeatureCacheUpdateException if the cache update 
operation fails
+   *                         due to invalid parameters or incompatibilities 
with the broker's
+   *                         supported features. In such a case, the existing 
cache contents are
+   *                         not modified.
+   */
+  def updateOrThrow(latestFeatures: Features[VersionLevelRange], latestEpoch: 
Int): Unit = {
+    updateOrThrow(FinalizedFeaturesAndEpoch(latestFeatures, latestEpoch))
+  }
+
+  private def updateOrThrow(latest: FinalizedFeaturesAndEpoch): Unit = {
+    val existingStr = featuresAndEpoch.map(existing => 
existing.toString).getOrElse("<empty>")

Review comment:
       s/existingStr/oldFeatureAndEpoch

##########
File path: core/src/main/scala/kafka/server/SupportedFeatures.scala
##########
@@ -0,0 +1,70 @@
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, VersionRange, 
VersionLevelRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A common object used in the Broker to define the latest features supported 
by the Broker.
+ * Also provides API to check for incompatibilities between the latest 
features supported by the
+ * Broker and cluster-wide finalized features.
+ */
+object SupportedFeatures extends Logging {
+  /**
+   * This is the latest features supported by the Broker.
+   * This is currently empty, but in the future as we define supported 
features, this map should be
+   * populated.
+   */
+  @volatile private var supportedFeatures = emptySupportedFeatures
+
+  /**
+   * Returns the latest features supported by the Broker.
+   */
+  def get: Features[VersionRange] = {
+    supportedFeatures
+  }
+
+  // Should be used only for testing.
+  def update(newFeatures: Features[VersionRange]): Unit = {
+    supportedFeatures = newFeatures
+  }
+
+  // Should be used only for testing.
+  def clear(): Unit = {
+    supportedFeatures = emptySupportedFeatures
+  }
+
+  /**
+   * Returns the set of feature names found to be incompatible between the 
latest features supported
+   * by the Broker, and the provided cluster-wide finalized features.
+   *
+   * @param finalized   The finalized features against which incompatibilities 
need to be checked for.
+   *
+   * @return            The set of incompatible feature names. If the returned 
set is empty, it
+   *                    means there were no feature incompatibilities found.
+   */
+  def incompatibleFeatures(finalized: Features[VersionLevelRange]): 
Set[String] = {
+    val supported = get

Review comment:
       This is only used on L53, maybe we could just use supportedFeatures 
instead

##########
File path: core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala
##########
@@ -0,0 +1,95 @@
+package kafka.server
+
+import org.apache.kafka.common.feature.{Features, VersionLevelRange, 
VersionRange}
+import org.junit.Assert.{assertEquals, assertThrows, assertTrue}
+import org.junit.{Before, Test}
+
+import scala.jdk.CollectionConverters._
+
+class FinalizedFeatureCacheTest {
+
+  @Before
+  def setUp(): Unit = {
+    FinalizedFeatureCache.clear()
+    SupportedFeatures.clear()
+  }
+
+  @Test
+  def testEmpty(): Unit = {
+    assertTrue(FinalizedFeatureCache.get.isEmpty)
+  }
+
+  @Test
+  def testUpdateOrThrowFailedDueToInvalidEpoch(): Unit = {
+    val supportedFeatures = Map[String, VersionRange](
+      "feature_1" -> new VersionRange(1, 4))
+    
SupportedFeatures.update(Features.supportedFeatures(supportedFeatures.asJava))
+
+    val features = Map[String, VersionLevelRange](
+      "feature_1" -> new VersionLevelRange(1, 4))
+    val finalizedFeatures = Features.finalizedFeatures(features.asJava)
+
+    FinalizedFeatureCache.updateOrThrow(finalizedFeatures, 10)
+    assertEquals(finalizedFeatures, FinalizedFeatureCache.get.get.features)

Review comment:
       Should we test `isDefined` before calling `get`?

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##########
@@ -0,0 +1,93 @@
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, VersionLevelRange}
+
+// Raised whenever there was an error in updating the FinalizedFeatureCache 
with features.
+class FeatureCacheUpdateException(message: String) extends 
RuntimeException(message) {
+}
+
+// Helper class that represents finalized features along with an epoch value.
+case class FinalizedFeaturesAndEpoch(features: Features[VersionLevelRange], 
epoch: Int) {
+
+  def isValid(newEpoch: Int): Boolean = {
+    newEpoch >= epoch
+  }
+
+  override def toString(): String = {
+    "FinalizedFeaturesAndEpoch(features=%s, epoch=%s)".format(features, epoch)
+  }
+}
+
+/**
+ * A mutable cache containing the latest finalized features and epoch. This 
cache is populated by a
+ * FinalizedFeatureChangeListener.
+ *
+ * Currently the main reader of this cache is the read path that serves an 
ApiVersionsRequest
+ * returning the features information in the response. In the future, as the 
feature versioning
+ * system in KIP-584 is used more widely, this cache could be read by other 
read paths trying to
+ * learn the finalized feature information.
+ */
+object FinalizedFeatureCache extends Logging {
+  @volatile private var featuresAndEpoch: Option[FinalizedFeaturesAndEpoch] = 
Option.empty
+
+  /**
+   * @return   the latest known FinalizedFeaturesAndEpoch. If the returned 
value is empty, it means
+   *           no FinalizedFeaturesAndEpoch exists in the cache at the time 
when this
+   *           method is invoked. This result could change in the future 
whenever the
+   *           updateOrThrow method is invoked.
+   */
+  def get: Option[FinalizedFeaturesAndEpoch] = {
+    featuresAndEpoch
+  }
+
+  def empty: Boolean = {
+    featuresAndEpoch.isEmpty
+  }
+
+  /**
+   * Clears all existing finalized features and epoch from the cache.
+   */
+  def clear(): Unit = {
+    featuresAndEpoch = Option.empty
+    info("Cleared cache")
+  }
+
+  /**
+   * Updates the cache to the latestFeatures, and updates the existing epoch 
to latestEpoch.
+   * Raises an exception when the operation is not successful.
+   *
+   * @param latestFeatures   the latest finalized features to be set in the 
cache
+   * @param latestEpoch      the latest epoch value to be set in the cache
+   *
+   * @throws                 FeatureCacheUpdateException if the cache update 
operation fails
+   *                         due to invalid parameters or incompatibilities 
with the broker's
+   *                         supported features. In such a case, the existing 
cache contents are
+   *                         not modified.
+   */
+  def updateOrThrow(latestFeatures: Features[VersionLevelRange], latestEpoch: 
Int): Unit = {
+    updateOrThrow(FinalizedFeaturesAndEpoch(latestFeatures, latestEpoch))
+  }
+
+  private def updateOrThrow(latest: FinalizedFeaturesAndEpoch): Unit = {
+    val existingStr = featuresAndEpoch.map(existing => 
existing.toString).getOrElse("<empty>")
+    if (!featuresAndEpoch.isEmpty && featuresAndEpoch.get.epoch > 
latest.epoch) {
+      val errorMsg = ("FinalizedFeatureCache update failed due to invalid 
epoch in new finalized %s." +
+        " The existing finalized is %s").format(latest, existingStr)
+      throw new FeatureCacheUpdateException(errorMsg)
+    } else {
+      val incompatibleFeatures = 
SupportedFeatures.incompatibleFeatures(latest.features)
+      if (incompatibleFeatures.nonEmpty) {
+        val errorMsg = ("FinalizedFeatureCache updated failed since feature 
compatibility" +
+          " checks failed! Supported %s has incompatibilities with the latest 
finalized %s." +
+          " The incompatible features are: %s.").format(
+          SupportedFeatures.get, latest, incompatibleFeatures)
+        throw new FeatureCacheUpdateException(errorMsg)
+      }
+    }
+    val logMsg = "Updated cache from existing finalized %s to latest finalized 
%s".format(

Review comment:
       This val seems redundant.

##########
File path: core/src/main/scala/kafka/server/SupportedFeatures.scala
##########
@@ -0,0 +1,70 @@
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, VersionRange, 
VersionLevelRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A common object used in the Broker to define the latest features supported 
by the Broker.
+ * Also provides API to check for incompatibilities between the latest 
features supported by the
+ * Broker and cluster-wide finalized features.
+ */
+object SupportedFeatures extends Logging {
+  /**
+   * This is the latest features supported by the Broker.
+   * This is currently empty, but in the future as we define supported 
features, this map should be
+   * populated.
+   */
+  @volatile private var supportedFeatures = emptySupportedFeatures
+
+  /**
+   * Returns the latest features supported by the Broker.
+   */
+  def get: Features[VersionRange] = {
+    supportedFeatures
+  }
+
+  // Should be used only for testing.
+  def update(newFeatures: Features[VersionRange]): Unit = {
+    supportedFeatures = newFeatures
+  }
+
+  // Should be used only for testing.
+  def clear(): Unit = {
+    supportedFeatures = emptySupportedFeatures
+  }
+
+  /**
+   * Returns the set of feature names found to be incompatible between the 
latest features supported
+   * by the Broker, and the provided cluster-wide finalized features.
+   *
+   * @param finalized   The finalized features against which incompatibilities 
need to be checked for.
+   *
+   * @return            The set of incompatible feature names. If the returned 
set is empty, it
+   *                    means there were no feature incompatibilities found.
+   */
+  def incompatibleFeatures(finalized: Features[VersionLevelRange]): 
Set[String] = {
+    val supported = get
+
+    val incompatibilities = finalized.all.asScala.collect {
+      case (feature, versionLevels) => {
+        val supportedVersions = supported.get(feature);
+        if (supportedVersions == null) {
+          (feature, "{feature=%s, reason='Unsupported 
feature'}".format(feature))
+        } else if (!versionLevels.isCompatibleWith(supportedVersions)) {

Review comment:
       nit: maybe rename to `incompatibleWith` and flip the boolean

##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -210,6 +215,14 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
         /* setup zookeeper */
         initZkClient(time)
 
+        /* initialize features */
+        _featureChangeListener = new FinalizedFeatureChangeListener(_zkClient)
+        if (config.interBrokerProtocolVersion >= KAFKA_2_6_IV1) {
+          // The feature versioning system (KIP-584) is active only when:
+          // config.interBrokerProtocolVersion is >= KAFKA_2_6_IV1.
+          _featureChangeListener.initOrThrow(60000)

Review comment:
       Could we make this parameter configurable?

##########
File path: core/src/main/scala/kafka/server/SupportedFeatures.scala
##########
@@ -0,0 +1,70 @@
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, VersionRange, 
VersionLevelRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A common object used in the Broker to define the latest features supported 
by the Broker.
+ * Also provides API to check for incompatibilities between the latest 
features supported by the
+ * Broker and cluster-wide finalized features.
+ */
+object SupportedFeatures extends Logging {
+  /**
+   * This is the latest features supported by the Broker.
+   * This is currently empty, but in the future as we define supported 
features, this map should be
+   * populated.
+   */
+  @volatile private var supportedFeatures = emptySupportedFeatures
+
+  /**
+   * Returns the latest features supported by the Broker.
+   */
+  def get: Features[VersionRange] = {
+    supportedFeatures
+  }
+
+  // Should be used only for testing.
+  def update(newFeatures: Features[VersionRange]): Unit = {
+    supportedFeatures = newFeatures
+  }
+
+  // Should be used only for testing.
+  def clear(): Unit = {
+    supportedFeatures = emptySupportedFeatures
+  }
+
+  /**
+   * Returns the set of feature names found to be incompatible between the 
latest features supported
+   * by the Broker, and the provided cluster-wide finalized features.
+   *
+   * @param finalized   The finalized features against which incompatibilities 
need to be checked for.

Review comment:
       This comment is a bit vague to me, what are you referring by 
`incompatibilities`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to