kowshik commented on a change in pull request #8680: URL: https://github.com/apache/kafka/pull/8680#discussion_r427022316
########## File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java ########## @@ -0,0 +1,143 @@ +package org.apache.kafka.common.feature; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.Objects; + +import static java.util.stream.Collectors.joining; + +/** + * Represents an immutable dictionary with key being feature name, and value being VersionRangeType. Review comment: Done. ########## File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java ########## @@ -0,0 +1,143 @@ +package org.apache.kafka.common.feature; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.Objects; + +import static java.util.stream.Collectors.joining; + +/** + * Represents an immutable dictionary with key being feature name, and value being VersionRangeType. + * Also provides API to serialize/deserialize the features and their version ranges to/from a map. + * + * This class can be instantiated only using its factory functions, with the important ones being: + * Features.supportedFeatures(...) and Features.finalizedFeatures(...). + * + * @param <VersionRangeType> is the type of version range. + */ +public class Features<VersionRangeType extends VersionRange> { + private final Map<String, VersionRangeType> features; + + /** + * Constructor is made private, as for readability it is preferred the caller uses one of the + * static factory functions for instantiation (see below). + * + * @param features Map of feature name to type of VersionRange, as the backing data structure + * for the Features object. + */ + private Features(Map<String, VersionRangeType> features) { + this.features = features; Review comment: Do you feel strongly about this? The reasons why I ask the question is: 1. Caller is unlikely to pass `null`. 2. I looked over a number of other existing classes in Kafka, and there aren't any null checks for most constructor parameters. It will help me if you could share couple examples from existing code where the `null` check convention is followed in Kafka. ########## File path: clients/src/test/java/org/apache/kafka/common/feature/FeaturesTest.java ########## @@ -0,0 +1,216 @@ +package org.apache.kafka.common.feature; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; + +public class FeaturesTest { + + @Test + public void testEmptyFeatures() { + Map<String, Map<String, Long>> emptyMap = new HashMap<>(); + + Features<VersionLevelRange> emptyFinalizedFeatures = Features.emptyFinalizedFeatures(); + assertEquals(new HashMap<>(), emptyFinalizedFeatures.all()); + assertEquals(emptyMap, emptyFinalizedFeatures.serialize()); + assertEquals(emptyFinalizedFeatures, Features.deserializeFinalizedFeatures(emptyMap)); + + Features<VersionRange> emptySupportedFeatures = Features.emptySupportedFeatures(); + assertEquals(new HashMap<>(), emptySupportedFeatures.all()); + assertEquals( + new HashMap<String, HashMap<String, Long>>(), + emptySupportedFeatures.serialize()); + assertEquals(emptySupportedFeatures, Features.deserializeSupportedFeatures(emptyMap)); + } + + @Test + public void testAllAPI() { + VersionRange v1 = new VersionRange(1, 2); + VersionRange v2 = new VersionRange(3, 4); + Map<String, VersionRange> allFeatures = new HashMap<String, VersionRange>() { + { + put("feature_1", v1); + put("feature_2", v2); + } + }; + Features<VersionRange> features = Features.supportedFeatures(allFeatures); + assertEquals(allFeatures, features.all()); + } + + @Test + public void testGetAPI() { + VersionRange v1 = new VersionRange(1, 2); + VersionRange v2 = new VersionRange(3, 4); + Map<String, VersionRange> allFeatures = new HashMap<String, VersionRange>() { + { + put("feature_1", v1); + put("feature_2", v2); + } + }; + Features<VersionRange> features = Features.supportedFeatures(allFeatures); + assertEquals(v1, features.get("feature_1")); + assertEquals(v2, features.get("feature_2")); + assertNull(features.get("nonexistent_feature")); + } + + @Test + public void testSerializeDeserializeSupportedFeatures() { + VersionRange v1 = new VersionRange(1, 2); + VersionRange v2 = new VersionRange(3, 4); + Map<String, VersionRange> allFeatures = new HashMap<String, VersionRange>() { Review comment: Done. ########## File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java ########## @@ -0,0 +1,143 @@ +package org.apache.kafka.common.feature; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.Objects; + +import static java.util.stream.Collectors.joining; + +/** + * Represents an immutable dictionary with key being feature name, and value being VersionRangeType. + * Also provides API to serialize/deserialize the features and their version ranges to/from a map. + * + * This class can be instantiated only using its factory functions, with the important ones being: + * Features.supportedFeatures(...) and Features.finalizedFeatures(...). + * + * @param <VersionRangeType> is the type of version range. + */ +public class Features<VersionRangeType extends VersionRange> { + private final Map<String, VersionRangeType> features; + + /** + * Constructor is made private, as for readability it is preferred the caller uses one of the + * static factory functions for instantiation (see below). + * + * @param features Map of feature name to type of VersionRange, as the backing data structure + * for the Features object. + */ + private Features(Map<String, VersionRangeType> features) { + this.features = features; + } + + /** + * @param features Map of feature name to VersionRange, as the backing data structure + * for the Features object. + * @return Returns a new Features object representing "supported" features. + */ + public static Features<VersionRange> supportedFeatures(Map<String, VersionRange> features) { + return new Features<VersionRange>(features); + } + + /** + * @param features Map of feature name to VersionLevelRange, as the backing data structure + * for the Features object. + * @return Returns a new Features object representing "finalized" features. + */ + public static Features<VersionLevelRange> finalizedFeatures(Map<String, VersionLevelRange> features) { + return new Features<VersionLevelRange>(features); + } + + public static Features<VersionLevelRange> emptyFinalizedFeatures() { Review comment: Done. Yes, I've changed it to default visibility now. ########## File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java ########## @@ -0,0 +1,143 @@ +package org.apache.kafka.common.feature; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.Objects; + +import static java.util.stream.Collectors.joining; + +/** + * Represents an immutable dictionary with key being feature name, and value being VersionRangeType. + * Also provides API to serialize/deserialize the features and their version ranges to/from a map. + * + * This class can be instantiated only using its factory functions, with the important ones being: + * Features.supportedFeatures(...) and Features.finalizedFeatures(...). + * + * @param <VersionRangeType> is the type of version range. + */ +public class Features<VersionRangeType extends VersionRange> { + private final Map<String, VersionRangeType> features; + + /** + * Constructor is made private, as for readability it is preferred the caller uses one of the + * static factory functions for instantiation (see below). + * + * @param features Map of feature name to type of VersionRange, as the backing data structure + * for the Features object. + */ + private Features(Map<String, VersionRangeType> features) { + this.features = features; + } + + /** + * @param features Map of feature name to VersionRange, as the backing data structure + * for the Features object. + * @return Returns a new Features object representing "supported" features. + */ + public static Features<VersionRange> supportedFeatures(Map<String, VersionRange> features) { + return new Features<VersionRange>(features); + } + + /** + * @param features Map of feature name to VersionLevelRange, as the backing data structure + * for the Features object. + * @return Returns a new Features object representing "finalized" features. + */ + public static Features<VersionLevelRange> finalizedFeatures(Map<String, VersionLevelRange> features) { + return new Features<VersionLevelRange>(features); + } + + public static Features<VersionLevelRange> emptyFinalizedFeatures() { + return new Features<>(new HashMap<>()); + } + + public static Features<VersionRange> emptySupportedFeatures() { + return new Features<>(new HashMap<>()); + } + + + public Map<String, VersionRangeType> all() { + return features; + } + + public boolean empty() { + return features.isEmpty(); + } + + public VersionRangeType get(String feature) { + return all().get(feature); + } + + public String toString() { + return String.format( + "Features{%s}", + features + .entrySet() + .stream() + .map(entry -> String.format("(%s -> %s)", entry.getKey(), entry.getValue())) + .collect(joining(", ")) + ); + } + + /** + * @return Serializes the underlying features to a map, and returns the same. + * The returned value can be deserialized using one of the deserialize* APIs. + */ + public Map<String, Map<String, Long>> serialize() { + return features.entrySet().stream().collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> entry.getValue().serialize())); + } + + /** + * Deserializes a map to Features<VersionLevelRange>. + * + * @param serialized the serialized representation of a Features<VersionLevelRange> object, + * generated using the serialize() API. + * + * @return the deserialized Features<VersionLevelRange> object + */ + public static Features<VersionLevelRange> deserializeFinalizedFeatures( + Map<String, Map<String, Long>> serialized) { + return finalizedFeatures(serialized.entrySet().stream().collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> VersionLevelRange.deserialize(entry.getValue())))); + } + + /** + * Deserializes a map to Features<VersionRange>. + * + * @param serialized the serialized representation of a Features<VersionRange> object, + * generated using the serialize() API. + * + * @return the deserialized Features<VersionRange> object + */ + public static Features<VersionRange> deserializeSupportedFeatures( + Map<String, Map<String, Long>> serialized) { + return supportedFeatures(serialized.entrySet().stream().collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> VersionRange.deserialize(entry.getValue())))); + } + + @Override + public boolean equals(Object other) { + if (this == other) { Review comment: Done. Good point! Added test as well. ########## File path: clients/src/test/java/org/apache/kafka/common/feature/FeaturesTest.java ########## @@ -0,0 +1,216 @@ +package org.apache.kafka.common.feature; Review comment: Done. ########## File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala ########## @@ -0,0 +1,93 @@ +package kafka.server + +import kafka.utils.Logging +import org.apache.kafka.common.feature.{Features, VersionLevelRange} + +// Raised whenever there was an error in updating the FinalizedFeatureCache with features. +class FeatureCacheUpdateException(message: String) extends RuntimeException(message) { +} + +// Helper class that represents finalized features along with an epoch value. +case class FinalizedFeaturesAndEpoch(features: Features[VersionLevelRange], epoch: Int) { + + def isValid(newEpoch: Int): Boolean = { + newEpoch >= epoch + } + + override def toString(): String = { + "FinalizedFeaturesAndEpoch(features=%s, epoch=%s)".format(features, epoch) + } +} + +/** + * A mutable cache containing the latest finalized features and epoch. This cache is populated by a + * FinalizedFeatureChangeListener. + * + * Currently the main reader of this cache is the read path that serves an ApiVersionsRequest + * returning the features information in the response. In the future, as the feature versioning + * system in KIP-584 is used more widely, this cache could be read by other read paths trying to + * learn the finalized feature information. + */ +object FinalizedFeatureCache extends Logging { + @volatile private var featuresAndEpoch: Option[FinalizedFeaturesAndEpoch] = Option.empty + + /** + * @return the latest known FinalizedFeaturesAndEpoch. If the returned value is empty, it means + * no FinalizedFeaturesAndEpoch exists in the cache at the time when this + * method is invoked. This result could change in the future whenever the + * updateOrThrow method is invoked. + */ + def get: Option[FinalizedFeaturesAndEpoch] = { + featuresAndEpoch + } + + def empty: Boolean = { + featuresAndEpoch.isEmpty + } + + /** + * Clears all existing finalized features and epoch from the cache. + */ + def clear(): Unit = { + featuresAndEpoch = Option.empty + info("Cleared cache") + } + + /** + * Updates the cache to the latestFeatures, and updates the existing epoch to latestEpoch. + * Raises an exception when the operation is not successful. + * + * @param latestFeatures the latest finalized features to be set in the cache + * @param latestEpoch the latest epoch value to be set in the cache + * + * @throws FeatureCacheUpdateException if the cache update operation fails + * due to invalid parameters or incompatibilities with the broker's + * supported features. In such a case, the existing cache contents are + * not modified. + */ + def updateOrThrow(latestFeatures: Features[VersionLevelRange], latestEpoch: Int): Unit = { + updateOrThrow(FinalizedFeaturesAndEpoch(latestFeatures, latestEpoch)) + } + + private def updateOrThrow(latest: FinalizedFeaturesAndEpoch): Unit = { + val existingStr = featuresAndEpoch.map(existing => existing.toString).getOrElse("<empty>") + if (!featuresAndEpoch.isEmpty && featuresAndEpoch.get.epoch > latest.epoch) { Review comment: Done. ########## File path: core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala ########## @@ -0,0 +1,95 @@ +package kafka.server + +import org.apache.kafka.common.feature.{Features, VersionLevelRange, VersionRange} +import org.junit.Assert.{assertEquals, assertThrows, assertTrue} +import org.junit.{Before, Test} + +import scala.jdk.CollectionConverters._ + +class FinalizedFeatureCacheTest { + + @Before + def setUp(): Unit = { + FinalizedFeatureCache.clear() + SupportedFeatures.clear() + } + + @Test + def testEmpty(): Unit = { + assertTrue(FinalizedFeatureCache.get.isEmpty) + } + + @Test + def testUpdateOrThrowFailedDueToInvalidEpoch(): Unit = { + val supportedFeatures = Map[String, VersionRange]( + "feature_1" -> new VersionRange(1, 4)) + SupportedFeatures.update(Features.supportedFeatures(supportedFeatures.asJava)) + + val features = Map[String, VersionLevelRange]( + "feature_1" -> new VersionLevelRange(1, 4)) + val finalizedFeatures = Features.finalizedFeatures(features.asJava) + + FinalizedFeatureCache.updateOrThrow(finalizedFeatures, 10) + assertEquals(finalizedFeatures, FinalizedFeatureCache.get.get.features) Review comment: Done. ########## File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala ########## @@ -0,0 +1,93 @@ +package kafka.server + +import kafka.utils.Logging +import org.apache.kafka.common.feature.{Features, VersionLevelRange} + +// Raised whenever there was an error in updating the FinalizedFeatureCache with features. +class FeatureCacheUpdateException(message: String) extends RuntimeException(message) { +} + +// Helper class that represents finalized features along with an epoch value. +case class FinalizedFeaturesAndEpoch(features: Features[VersionLevelRange], epoch: Int) { + + def isValid(newEpoch: Int): Boolean = { + newEpoch >= epoch + } + + override def toString(): String = { + "FinalizedFeaturesAndEpoch(features=%s, epoch=%s)".format(features, epoch) + } +} + +/** + * A mutable cache containing the latest finalized features and epoch. This cache is populated by a + * FinalizedFeatureChangeListener. + * + * Currently the main reader of this cache is the read path that serves an ApiVersionsRequest + * returning the features information in the response. In the future, as the feature versioning + * system in KIP-584 is used more widely, this cache could be read by other read paths trying to + * learn the finalized feature information. + */ +object FinalizedFeatureCache extends Logging { + @volatile private var featuresAndEpoch: Option[FinalizedFeaturesAndEpoch] = Option.empty + + /** + * @return the latest known FinalizedFeaturesAndEpoch. If the returned value is empty, it means + * no FinalizedFeaturesAndEpoch exists in the cache at the time when this + * method is invoked. This result could change in the future whenever the + * updateOrThrow method is invoked. + */ + def get: Option[FinalizedFeaturesAndEpoch] = { + featuresAndEpoch + } + + def empty: Boolean = { + featuresAndEpoch.isEmpty + } + + /** + * Clears all existing finalized features and epoch from the cache. + */ + def clear(): Unit = { + featuresAndEpoch = Option.empty + info("Cleared cache") + } + + /** + * Updates the cache to the latestFeatures, and updates the existing epoch to latestEpoch. + * Raises an exception when the operation is not successful. + * + * @param latestFeatures the latest finalized features to be set in the cache + * @param latestEpoch the latest epoch value to be set in the cache + * + * @throws FeatureCacheUpdateException if the cache update operation fails + * due to invalid parameters or incompatibilities with the broker's + * supported features. In such a case, the existing cache contents are + * not modified. + */ + def updateOrThrow(latestFeatures: Features[VersionLevelRange], latestEpoch: Int): Unit = { + updateOrThrow(FinalizedFeaturesAndEpoch(latestFeatures, latestEpoch)) + } + + private def updateOrThrow(latest: FinalizedFeaturesAndEpoch): Unit = { + val existingStr = featuresAndEpoch.map(existing => existing.toString).getOrElse("<empty>") + if (!featuresAndEpoch.isEmpty && featuresAndEpoch.get.epoch > latest.epoch) { + val errorMsg = ("FinalizedFeatureCache update failed due to invalid epoch in new finalized %s." + + " The existing finalized is %s").format(latest, existingStr) + throw new FeatureCacheUpdateException(errorMsg) + } else { + val incompatibleFeatures = SupportedFeatures.incompatibleFeatures(latest.features) + if (incompatibleFeatures.nonEmpty) { + val errorMsg = ("FinalizedFeatureCache updated failed since feature compatibility" + Review comment: It is used intentionally to split the log message into 2 lines (for ~100-char readability limit per line). Otherwise the string will be huge and all in the same line. ########## File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java ########## @@ -0,0 +1,143 @@ +package org.apache.kafka.common.feature; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.Objects; + +import static java.util.stream.Collectors.joining; + +/** + * Represents an immutable dictionary with key being feature name, and value being VersionRangeType. + * Also provides API to serialize/deserialize the features and their version ranges to/from a map. + * + * This class can be instantiated only using its factory functions, with the important ones being: + * Features.supportedFeatures(...) and Features.finalizedFeatures(...). + * + * @param <VersionRangeType> is the type of version range. + */ +public class Features<VersionRangeType extends VersionRange> { + private final Map<String, VersionRangeType> features; + + /** + * Constructor is made private, as for readability it is preferred the caller uses one of the + * static factory functions for instantiation (see below). + * + * @param features Map of feature name to type of VersionRange, as the backing data structure + * for the Features object. + */ + private Features(Map<String, VersionRangeType> features) { + this.features = features; + } + + /** + * @param features Map of feature name to VersionRange, as the backing data structure + * for the Features object. + * @return Returns a new Features object representing "supported" features. + */ + public static Features<VersionRange> supportedFeatures(Map<String, VersionRange> features) { + return new Features<VersionRange>(features); + } + + /** + * @param features Map of feature name to VersionLevelRange, as the backing data structure + * for the Features object. + * @return Returns a new Features object representing "finalized" features. + */ + public static Features<VersionLevelRange> finalizedFeatures(Map<String, VersionLevelRange> features) { + return new Features<VersionLevelRange>(features); + } + + public static Features<VersionLevelRange> emptyFinalizedFeatures() { + return new Features<>(new HashMap<>()); + } + + public static Features<VersionRange> emptySupportedFeatures() { + return new Features<>(new HashMap<>()); + } + + + public Map<String, VersionRangeType> all() { + return features; + } + + public boolean empty() { + return features.isEmpty(); + } + + public VersionRangeType get(String feature) { + return all().get(feature); Review comment: The underlying data structure is a `Map`. It would be simpler if this method just returns `null` if the feature doesn't exist. For example, that is how java's `Map.get` works, here is the javadoc: https://docs.oracle.com/javase/8/docs/api/java/util/Map.html#get-java.lang.Object-. Also, I've documented this method now (doc was previously absent). ########## File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java ########## @@ -0,0 +1,143 @@ +package org.apache.kafka.common.feature; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.Objects; + +import static java.util.stream.Collectors.joining; + +/** + * Represents an immutable dictionary with key being feature name, and value being VersionRangeType. + * Also provides API to serialize/deserialize the features and their version ranges to/from a map. + * + * This class can be instantiated only using its factory functions, with the important ones being: + * Features.supportedFeatures(...) and Features.finalizedFeatures(...). + * + * @param <VersionRangeType> is the type of version range. + */ +public class Features<VersionRangeType extends VersionRange> { + private final Map<String, VersionRangeType> features; + + /** + * Constructor is made private, as for readability it is preferred the caller uses one of the + * static factory functions for instantiation (see below). + * + * @param features Map of feature name to type of VersionRange, as the backing data structure + * for the Features object. + */ + private Features(Map<String, VersionRangeType> features) { + this.features = features; + } + + /** + * @param features Map of feature name to VersionRange, as the backing data structure + * for the Features object. + * @return Returns a new Features object representing "supported" features. + */ + public static Features<VersionRange> supportedFeatures(Map<String, VersionRange> features) { + return new Features<VersionRange>(features); + } + + /** + * @param features Map of feature name to VersionLevelRange, as the backing data structure + * for the Features object. + * @return Returns a new Features object representing "finalized" features. + */ + public static Features<VersionLevelRange> finalizedFeatures(Map<String, VersionLevelRange> features) { + return new Features<VersionLevelRange>(features); Review comment: Done. ########## File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java ########## @@ -0,0 +1,143 @@ +package org.apache.kafka.common.feature; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.Objects; + +import static java.util.stream.Collectors.joining; + +/** + * Represents an immutable dictionary with key being feature name, and value being VersionRangeType. + * Also provides API to serialize/deserialize the features and their version ranges to/from a map. + * + * This class can be instantiated only using its factory functions, with the important ones being: + * Features.supportedFeatures(...) and Features.finalizedFeatures(...). + * + * @param <VersionRangeType> is the type of version range. + */ +public class Features<VersionRangeType extends VersionRange> { + private final Map<String, VersionRangeType> features; + + /** + * Constructor is made private, as for readability it is preferred the caller uses one of the + * static factory functions for instantiation (see below). + * + * @param features Map of feature name to type of VersionRange, as the backing data structure + * for the Features object. + */ + private Features(Map<String, VersionRangeType> features) { + this.features = features; + } + + /** + * @param features Map of feature name to VersionRange, as the backing data structure + * for the Features object. + * @return Returns a new Features object representing "supported" features. + */ + public static Features<VersionRange> supportedFeatures(Map<String, VersionRange> features) { + return new Features<VersionRange>(features); Review comment: Done. Good point! ########## File path: clients/src/test/java/org/apache/kafka/common/feature/FeaturesTest.java ########## @@ -0,0 +1,216 @@ +package org.apache.kafka.common.feature; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; + +public class FeaturesTest { + + @Test + public void testEmptyFeatures() { + Map<String, Map<String, Long>> emptyMap = new HashMap<>(); + + Features<VersionLevelRange> emptyFinalizedFeatures = Features.emptyFinalizedFeatures(); + assertEquals(new HashMap<>(), emptyFinalizedFeatures.all()); + assertEquals(emptyMap, emptyFinalizedFeatures.serialize()); + assertEquals(emptyFinalizedFeatures, Features.deserializeFinalizedFeatures(emptyMap)); + + Features<VersionRange> emptySupportedFeatures = Features.emptySupportedFeatures(); + assertEquals(new HashMap<>(), emptySupportedFeatures.all()); + assertEquals( + new HashMap<String, HashMap<String, Long>>(), + emptySupportedFeatures.serialize()); + assertEquals(emptySupportedFeatures, Features.deserializeSupportedFeatures(emptyMap)); + } + + @Test + public void testAllAPI() { Review comment: Done. ########## File path: clients/src/main/java/org/apache/kafka/common/feature/VersionRange.java ########## @@ -0,0 +1,104 @@ +package org.apache.kafka.common.feature; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Represents an immutable basic version range using 2 attributes: min and max of type long. + * The min and max attributes are expected to be >= 1, and with max >= min. + * + * The class also provides API to serialize/deserialize the version range to/from a map. + * The class allows for configurable labels for the min/max attributes, which can be specialized by + * sub-classes (if needed). + * + * NOTE: This is the backing class used to define the min/max versions for supported features. + */ +public class VersionRange { + // Label for the min version key, that's used only for serialization/deserialization purposes. + private static final String MIN_VERSION_KEY_LABEL = "min_version"; + + // Label for the max version key, that's used only for serialization/deserialization purposes. + private static final String MAX_VERSION_KEY_LABEL = "max_version"; + + private final String minKeyLabel; + + private final long minValue; + + private final String maxKeyLabel; + + private final long maxValue; + + protected VersionRange(String minKey, long minValue, String maxKeyLabel, long maxValue) { + if (minValue < 1 || maxValue < 1 || maxValue < minValue) { + throw new IllegalArgumentException( + String.format( + "Expected minValue > 1, maxValue > 1 and maxValue >= minValue, but received" + + " minValue: %d, maxValue: %d", minValue, maxValue)); + } + this.minKeyLabel = minKey; + this.minValue = minValue; + this.maxKeyLabel = maxKeyLabel; + this.maxValue = maxValue; + } + + public VersionRange(long minVersion, long maxVersion) { + this(MIN_VERSION_KEY_LABEL, minVersion, MAX_VERSION_KEY_LABEL, maxVersion); + } + + public long min() { + return minValue; + } + + public long max() { + return maxValue; + } + + public String toString() { + return String.format("%s[%d, %d]", this.getClass().getSimpleName(), min(), max()); + } + + public Map<String, Long> serialize() { + return new HashMap<String, Long>() { + { + put(minKeyLabel, min()); + put(maxKeyLabel, max()); + } + }; + } + + public static VersionRange deserialize(Map<String, Long> serialized) { + return new VersionRange( + valueOrThrow(MIN_VERSION_KEY_LABEL, serialized), + valueOrThrow(MAX_VERSION_KEY_LABEL, serialized)); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (!(other instanceof VersionRange)) { + return false; + } + + final VersionRange that = (VersionRange) other; + return Objects.equals(this.minKeyLabel, that.minKeyLabel) && Review comment: It provides slightly better convenience: `Object.equals` will also take care of the `null` checks for you. Also it turned out it was overkill to use `Objects.equals` for primitive type checks for `minValue` and `maxValue`. So I've simplified the code to use `==` those attributes. Good point! ########## File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java ########## @@ -0,0 +1,143 @@ +package org.apache.kafka.common.feature; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.Objects; + +import static java.util.stream.Collectors.joining; + +/** + * Represents an immutable dictionary with key being feature name, and value being VersionRangeType. + * Also provides API to serialize/deserialize the features and their version ranges to/from a map. + * + * This class can be instantiated only using its factory functions, with the important ones being: + * Features.supportedFeatures(...) and Features.finalizedFeatures(...). + * + * @param <VersionRangeType> is the type of version range. + */ +public class Features<VersionRangeType extends VersionRange> { + private final Map<String, VersionRangeType> features; + + /** + * Constructor is made private, as for readability it is preferred the caller uses one of the + * static factory functions for instantiation (see below). + * + * @param features Map of feature name to type of VersionRange, as the backing data structure + * for the Features object. + */ + private Features(Map<String, VersionRangeType> features) { + this.features = features; + } + + /** + * @param features Map of feature name to VersionRange, as the backing data structure + * for the Features object. + * @return Returns a new Features object representing "supported" features. + */ + public static Features<VersionRange> supportedFeatures(Map<String, VersionRange> features) { + return new Features<VersionRange>(features); + } + + /** + * @param features Map of feature name to VersionLevelRange, as the backing data structure + * for the Features object. + * @return Returns a new Features object representing "finalized" features. + */ + public static Features<VersionLevelRange> finalizedFeatures(Map<String, VersionLevelRange> features) { + return new Features<VersionLevelRange>(features); + } + + public static Features<VersionLevelRange> emptyFinalizedFeatures() { + return new Features<>(new HashMap<>()); + } + + public static Features<VersionRange> emptySupportedFeatures() { + return new Features<>(new HashMap<>()); + } + + + public Map<String, VersionRangeType> all() { + return features; + } + + public boolean empty() { + return features.isEmpty(); + } + + public VersionRangeType get(String feature) { + return all().get(feature); + } + + public String toString() { + return String.format( + "Features{%s}", + features + .entrySet() + .stream() + .map(entry -> String.format("(%s -> %s)", entry.getKey(), entry.getValue())) + .collect(joining(", ")) + ); + } + + /** + * @return Serializes the underlying features to a map, and returns the same. + * The returned value can be deserialized using one of the deserialize* APIs. + */ + public Map<String, Map<String, Long>> serialize() { + return features.entrySet().stream().collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> entry.getValue().serialize())); + } + + /** + * Deserializes a map to Features<VersionLevelRange>. Review comment: Done. ########## File path: clients/src/main/resources/common/message/ApiVersionsResponse.json ########## @@ -42,6 +42,33 @@ "about": "The maximum supported version, inclusive." } ]}, { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, - "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." } + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, Review comment: Are you sure? All newly added fields are tagged (i.e. optional). Going by [this documentation](https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields#KIP-482:TheKafkaProtocolshouldSupportOptionalTaggedFields-TaggedFields) in KIP-482, it is not required to change the schema version whenever tagged fields are introduced. ########## File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java ########## @@ -0,0 +1,143 @@ +package org.apache.kafka.common.feature; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.Objects; + +import static java.util.stream.Collectors.joining; + +/** + * Represents an immutable dictionary with key being feature name, and value being VersionRangeType. + * Also provides API to serialize/deserialize the features and their version ranges to/from a map. + * + * This class can be instantiated only using its factory functions, with the important ones being: + * Features.supportedFeatures(...) and Features.finalizedFeatures(...). + * + * @param <VersionRangeType> is the type of version range. + */ +public class Features<VersionRangeType extends VersionRange> { + private final Map<String, VersionRangeType> features; + + /** + * Constructor is made private, as for readability it is preferred the caller uses one of the + * static factory functions for instantiation (see below). + * + * @param features Map of feature name to type of VersionRange, as the backing data structure + * for the Features object. + */ + private Features(Map<String, VersionRangeType> features) { + this.features = features; + } + + /** + * @param features Map of feature name to VersionRange, as the backing data structure + * for the Features object. + * @return Returns a new Features object representing "supported" features. + */ + public static Features<VersionRange> supportedFeatures(Map<String, VersionRange> features) { + return new Features<VersionRange>(features); + } + + /** + * @param features Map of feature name to VersionLevelRange, as the backing data structure + * for the Features object. + * @return Returns a new Features object representing "finalized" features. + */ + public static Features<VersionLevelRange> finalizedFeatures(Map<String, VersionLevelRange> features) { + return new Features<VersionLevelRange>(features); + } + + public static Features<VersionLevelRange> emptyFinalizedFeatures() { + return new Features<>(new HashMap<>()); + } + + public static Features<VersionRange> emptySupportedFeatures() { + return new Features<>(new HashMap<>()); + } + + + public Map<String, VersionRangeType> all() { + return features; + } + + public boolean empty() { + return features.isEmpty(); + } + + public VersionRangeType get(String feature) { + return all().get(feature); + } + + public String toString() { + return String.format( + "Features{%s}", + features + .entrySet() + .stream() + .map(entry -> String.format("(%s -> %s)", entry.getKey(), entry.getValue())) + .collect(joining(", ")) + ); + } + + /** + * @return Serializes the underlying features to a map, and returns the same. Review comment: Done. ########## File path: core/src/main/scala/kafka/cluster/Broker.scala ########## @@ -34,14 +36,19 @@ object Broker { brokerId: Int, endpoints: util.List[Endpoint], interBrokerEndpoint: Endpoint) extends AuthorizerServerInfo + + def apply(id: Int, endPoints: Seq[EndPoint], rack: Option[String]): Broker = { + new Broker(id, endPoints, rack, emptySupportedFeatures) + } } /** * A Kafka broker. - * A broker has an id, a collection of end-points, an optional rack and a listener to security protocol map. + * A broker has an id, a collection of end-points, an optional rack and a listener to security protocol map, Review comment: Done. Good point! ########## File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java ########## @@ -0,0 +1,143 @@ +package org.apache.kafka.common.feature; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.Objects; + +import static java.util.stream.Collectors.joining; + +/** + * Represents an immutable dictionary with key being feature name, and value being VersionRangeType. + * Also provides API to serialize/deserialize the features and their version ranges to/from a map. + * + * This class can be instantiated only using its factory functions, with the important ones being: + * Features.supportedFeatures(...) and Features.finalizedFeatures(...). + * + * @param <VersionRangeType> is the type of version range. + */ +public class Features<VersionRangeType extends VersionRange> { + private final Map<String, VersionRangeType> features; + + /** + * Constructor is made private, as for readability it is preferred the caller uses one of the + * static factory functions for instantiation (see below). + * + * @param features Map of feature name to type of VersionRange, as the backing data structure + * for the Features object. + */ + private Features(Map<String, VersionRangeType> features) { + this.features = features; + } + + /** + * @param features Map of feature name to VersionRange, as the backing data structure + * for the Features object. + * @return Returns a new Features object representing "supported" features. + */ + public static Features<VersionRange> supportedFeatures(Map<String, VersionRange> features) { + return new Features<VersionRange>(features); + } + + /** + * @param features Map of feature name to VersionLevelRange, as the backing data structure + * for the Features object. + * @return Returns a new Features object representing "finalized" features. + */ + public static Features<VersionLevelRange> finalizedFeatures(Map<String, VersionLevelRange> features) { + return new Features<VersionLevelRange>(features); + } + + public static Features<VersionLevelRange> emptyFinalizedFeatures() { + return new Features<>(new HashMap<>()); + } + + public static Features<VersionRange> emptySupportedFeatures() { + return new Features<>(new HashMap<>()); + } + + + public Map<String, VersionRangeType> all() { Review comment: Done. Good point! ########## File path: clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java ########## @@ -113,14 +141,26 @@ public static ApiVersionsResponse fromStruct(Struct struct, short version) { } } - public static ApiVersionsResponse apiVersionsResponse(int throttleTimeMs, byte maxMagic) { + public static ApiVersionsResponse apiVersionsResponse( Review comment: Done. Good point! ########## File path: clients/src/main/java/org/apache/kafka/common/feature/VersionRange.java ########## @@ -0,0 +1,104 @@ +package org.apache.kafka.common.feature; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Represents an immutable basic version range using 2 attributes: min and max of type long. + * The min and max attributes are expected to be >= 1, and with max >= min. + * + * The class also provides API to serialize/deserialize the version range to/from a map. + * The class allows for configurable labels for the min/max attributes, which can be specialized by + * sub-classes (if needed). + * + * NOTE: This is the backing class used to define the min/max versions for supported features. + */ +public class VersionRange { + // Label for the min version key, that's used only for serialization/deserialization purposes. + private static final String MIN_VERSION_KEY_LABEL = "min_version"; + + // Label for the max version key, that's used only for serialization/deserialization purposes. + private static final String MAX_VERSION_KEY_LABEL = "max_version"; + + private final String minKeyLabel; + + private final long minValue; + + private final String maxKeyLabel; + + private final long maxValue; + + protected VersionRange(String minKey, long minValue, String maxKeyLabel, long maxValue) { + if (minValue < 1 || maxValue < 1 || maxValue < minValue) { + throw new IllegalArgumentException( + String.format( + "Expected minValue > 1, maxValue > 1 and maxValue >= minValue, but received" + + " minValue: %d, maxValue: %d", minValue, maxValue)); + } + this.minKeyLabel = minKey; + this.minValue = minValue; + this.maxKeyLabel = maxKeyLabel; + this.maxValue = maxValue; + } + + public VersionRange(long minVersion, long maxVersion) { + this(MIN_VERSION_KEY_LABEL, minVersion, MAX_VERSION_KEY_LABEL, maxVersion); + } + + public long min() { + return minValue; + } + + public long max() { + return maxValue; + } + + public String toString() { + return String.format("%s[%d, %d]", this.getClass().getSimpleName(), min(), max()); + } + + public Map<String, Long> serialize() { + return new HashMap<String, Long>() { + { + put(minKeyLabel, min()); + put(maxKeyLabel, max()); + } + }; + } + + public static VersionRange deserialize(Map<String, Long> serialized) { + return new VersionRange( + valueOrThrow(MIN_VERSION_KEY_LABEL, serialized), + valueOrThrow(MAX_VERSION_KEY_LABEL, serialized)); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (!(other instanceof VersionRange)) { Review comment: Done. Also added a test. Good catch! ########## File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java ########## @@ -0,0 +1,143 @@ +package org.apache.kafka.common.feature; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.Objects; + +import static java.util.stream.Collectors.joining; + +/** + * Represents an immutable dictionary with key being feature name, and value being VersionRangeType. + * Also provides API to serialize/deserialize the features and their version ranges to/from a map. + * + * This class can be instantiated only using its factory functions, with the important ones being: + * Features.supportedFeatures(...) and Features.finalizedFeatures(...). + * + * @param <VersionRangeType> is the type of version range. + */ +public class Features<VersionRangeType extends VersionRange> { + private final Map<String, VersionRangeType> features; + + /** + * Constructor is made private, as for readability it is preferred the caller uses one of the + * static factory functions for instantiation (see below). + * + * @param features Map of feature name to type of VersionRange, as the backing data structure + * for the Features object. + */ + private Features(Map<String, VersionRangeType> features) { + this.features = features; + } + + /** + * @param features Map of feature name to VersionRange, as the backing data structure + * for the Features object. + * @return Returns a new Features object representing "supported" features. + */ + public static Features<VersionRange> supportedFeatures(Map<String, VersionRange> features) { + return new Features<VersionRange>(features); + } + + /** + * @param features Map of feature name to VersionLevelRange, as the backing data structure + * for the Features object. + * @return Returns a new Features object representing "finalized" features. + */ + public static Features<VersionLevelRange> finalizedFeatures(Map<String, VersionLevelRange> features) { + return new Features<VersionLevelRange>(features); + } + + public static Features<VersionLevelRange> emptyFinalizedFeatures() { + return new Features<>(new HashMap<>()); + } + + public static Features<VersionRange> emptySupportedFeatures() { + return new Features<>(new HashMap<>()); + } + + + public Map<String, VersionRangeType> all() { + return features; + } + + public boolean empty() { + return features.isEmpty(); + } + + public VersionRangeType get(String feature) { + return all().get(feature); Review comment: Done. Good point! ########## File path: clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java ########## @@ -77,6 +93,18 @@ public boolean shouldClientThrottle(short version) { return version >= 2; } + public SupportedFeatureKey supportedFeature(String featureName) { Review comment: Done. ########## File path: clients/src/test/java/org/apache/kafka/common/feature/VersionLevelRangeTest.java ########## @@ -0,0 +1,162 @@ +package org.apache.kafka.common.feature; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class VersionLevelRangeTest { + + @Test + public void testCreateFailDueToInvalidParams() { + // min and max can't be < 1. Review comment: Done. Some of it is not required. Good point, I have removed the unnecessary testing now. We still need to check if exception is thrown in these 4 basic tests: min < 1, max < 1, min & max < 1 and max > min. ########## File path: clients/src/test/java/org/apache/kafka/common/feature/FeaturesTest.java ########## @@ -0,0 +1,216 @@ +package org.apache.kafka.common.feature; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; + +public class FeaturesTest { + + @Test + public void testEmptyFeatures() { + Map<String, Map<String, Long>> emptyMap = new HashMap<>(); + + Features<VersionLevelRange> emptyFinalizedFeatures = Features.emptyFinalizedFeatures(); + assertEquals(new HashMap<>(), emptyFinalizedFeatures.all()); + assertEquals(emptyMap, emptyFinalizedFeatures.serialize()); + assertEquals(emptyFinalizedFeatures, Features.deserializeFinalizedFeatures(emptyMap)); + + Features<VersionRange> emptySupportedFeatures = Features.emptySupportedFeatures(); + assertEquals(new HashMap<>(), emptySupportedFeatures.all()); + assertEquals( + new HashMap<String, HashMap<String, Long>>(), + emptySupportedFeatures.serialize()); + assertEquals(emptySupportedFeatures, Features.deserializeSupportedFeatures(emptyMap)); + } + + @Test + public void testAllAPI() { + VersionRange v1 = new VersionRange(1, 2); + VersionRange v2 = new VersionRange(3, 4); + Map<String, VersionRange> allFeatures = new HashMap<String, VersionRange>() { + { + put("feature_1", v1); + put("feature_2", v2); + } + }; + Features<VersionRange> features = Features.supportedFeatures(allFeatures); + assertEquals(allFeatures, features.all()); + } + + @Test + public void testGetAPI() { + VersionRange v1 = new VersionRange(1, 2); + VersionRange v2 = new VersionRange(3, 4); + Map<String, VersionRange> allFeatures = new HashMap<String, VersionRange>() { Review comment: Done. ########## File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java ########## @@ -0,0 +1,143 @@ +package org.apache.kafka.common.feature; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.Objects; + +import static java.util.stream.Collectors.joining; + +/** + * Represents an immutable dictionary with key being feature name, and value being VersionRangeType. + * Also provides API to serialize/deserialize the features and their version ranges to/from a map. + * + * This class can be instantiated only using its factory functions, with the important ones being: + * Features.supportedFeatures(...) and Features.finalizedFeatures(...). + * + * @param <VersionRangeType> is the type of version range. + */ +public class Features<VersionRangeType extends VersionRange> { + private final Map<String, VersionRangeType> features; + + /** + * Constructor is made private, as for readability it is preferred the caller uses one of the + * static factory functions for instantiation (see below). + * + * @param features Map of feature name to type of VersionRange, as the backing data structure + * for the Features object. + */ + private Features(Map<String, VersionRangeType> features) { + this.features = features; + } + + /** + * @param features Map of feature name to VersionRange, as the backing data structure + * for the Features object. + * @return Returns a new Features object representing "supported" features. + */ + public static Features<VersionRange> supportedFeatures(Map<String, VersionRange> features) { + return new Features<VersionRange>(features); + } + + /** + * @param features Map of feature name to VersionLevelRange, as the backing data structure + * for the Features object. + * @return Returns a new Features object representing "finalized" features. + */ + public static Features<VersionLevelRange> finalizedFeatures(Map<String, VersionLevelRange> features) { + return new Features<VersionLevelRange>(features); + } + + public static Features<VersionLevelRange> emptyFinalizedFeatures() { + return new Features<>(new HashMap<>()); + } + + public static Features<VersionRange> emptySupportedFeatures() { + return new Features<>(new HashMap<>()); + } + Review comment: Done. Removed it. ########## File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala ########## @@ -0,0 +1,93 @@ +package kafka.server + +import kafka.utils.Logging +import org.apache.kafka.common.feature.{Features, VersionLevelRange} + +// Raised whenever there was an error in updating the FinalizedFeatureCache with features. +class FeatureCacheUpdateException(message: String) extends RuntimeException(message) { +} + +// Helper class that represents finalized features along with an epoch value. +case class FinalizedFeaturesAndEpoch(features: Features[VersionLevelRange], epoch: Int) { + + def isValid(newEpoch: Int): Boolean = { Review comment: Done. It was unused and I have eliminated it now. ########## File path: core/src/main/scala/kafka/cluster/Broker.scala ########## @@ -34,14 +36,19 @@ object Broker { brokerId: Int, endpoints: util.List[Endpoint], interBrokerEndpoint: Endpoint) extends AuthorizerServerInfo + + def apply(id: Int, endPoints: Seq[EndPoint], rack: Option[String]): Broker = { Review comment: No, this constructor overload was simply created to avoid a churn of test code at number of places adding the additional `SupportedFeatures` parameter. How do you feel about keeping it? ########## File path: clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java ########## @@ -113,14 +141,26 @@ public static ApiVersionsResponse fromStruct(Struct struct, short version) { } } - public static ApiVersionsResponse apiVersionsResponse(int throttleTimeMs, byte maxMagic) { + public static ApiVersionsResponse apiVersionsResponse( + int throttleTimeMs, + byte maxMagic, + Features<VersionRange> latestSupportedFeatures, + Optional<Features<VersionLevelRange>> finalizedFeatures, + Optional<Long> finalizedFeaturesEpoch) { if (maxMagic == RecordBatch.CURRENT_MAGIC_VALUE && throttleTimeMs == DEFAULT_THROTTLE_TIME) { return DEFAULT_API_VERSIONS_RESPONSE; } - return createApiVersionsResponse(throttleTimeMs, maxMagic); + return createApiVersionsResponse( + throttleTimeMs, maxMagic, latestSupportedFeatures, finalizedFeatures, finalizedFeaturesEpoch); } - public static ApiVersionsResponse createApiVersionsResponse(int throttleTimeMs, final byte minMagic) { + public static ApiVersionsResponse createApiVersionsResponse( Review comment: The tests have been already added. Pls check out the tests added in `ApiVersionsResponseTest.java`, particularly: `shouldReturnFeatureKeysWhenMagicIsCurrentValueAndThrottleMsIsDefaultThrottle`. Let me know if this test does not look sufficient. ########## File path: clients/src/test/java/org/apache/kafka/common/feature/VersionRangeTest.java ########## @@ -0,0 +1,150 @@ +package org.apache.kafka.common.feature; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +public class VersionRangeTest { + @Test + public void testFailDueToInvalidParams() { + // min and max can't be < 1. + assertThrows( + IllegalArgumentException.class, + () -> new VersionRange(0, 0)); + assertThrows( + IllegalArgumentException.class, + () -> new VersionRange(-1, -1)); + // min can't be < 1. + assertThrows( + IllegalArgumentException.class, + () -> new VersionRange(0, 1)); + assertThrows( + IllegalArgumentException.class, + () -> new VersionRange(-1, 1)); + // max can't be < 1. + assertThrows( + IllegalArgumentException.class, + () -> new VersionRange(1, 0)); + assertThrows( + IllegalArgumentException.class, + () -> new VersionRange(1, -1)); + // min can't be > max. + assertThrows( + IllegalArgumentException.class, + () -> new VersionRange(2, 1)); + } + + @Test + public void testSerializeDeserializeTest() { + VersionRange versionRange = new VersionRange(1, 2); + assertEquals(1, versionRange.min()); + assertEquals(2, versionRange.max()); + + Map<String, Long> serialized = versionRange.serialize(); + assertEquals( + new HashMap<String, Long>() { + { + put("min_version", versionRange.min()); + put("max_version", versionRange.max()); + } + }, + serialized + ); + + VersionRange deserialized = VersionRange.deserialize(serialized); + assertEquals(1, deserialized.min()); + assertEquals(2, deserialized.max()); + assertEquals(versionRange, deserialized); + } + + @Test + public void testDeserializationFailureTest() { + // min_version can't be < 1. + Map<String, Long> invalidWithBadMinVersion = new HashMap<String, Long>() { + { + put("min_version", 0L); + put("max_version", 1L); + } + }; + assertThrows( + IllegalArgumentException.class, + () -> VersionRange.deserialize(invalidWithBadMinVersion)); + + // max_version can't be < 1. + Map<String, Long> invalidWithBadMaxVersion = new HashMap<String, Long>() { + { + put("min_version", 1L); + put("max_version", 0L); + } + }; + assertThrows( + IllegalArgumentException.class, + () -> VersionRange.deserialize(invalidWithBadMaxVersion)); + + // min_version and max_version can't be < 1. + Map<String, Long> invalidWithBadMinMaxVersion = new HashMap<String, Long>() { + { + put("min_version", 0L); + put("max_version", 0L); + } + }; + assertThrows( + IllegalArgumentException.class, + () -> VersionRange.deserialize(invalidWithBadMinMaxVersion)); + + // min_version can't be > max_version. + Map<String, Long> invalidWithLowerMaxVersion = new HashMap<String, Long>() { + { + put("min_version", 2L); + put("max_version", 1L); + } + }; + assertThrows( + IllegalArgumentException.class, + () -> VersionRange.deserialize(invalidWithLowerMaxVersion)); + + // min_version key missing. + Map<String, Long> invalidWithMinKeyMissing = new HashMap<String, Long>() { + { + put("max_version", 1L); + } + }; + assertThrows( + IllegalArgumentException.class, + () -> VersionRange.deserialize(invalidWithMinKeyMissing)); + + // max_version key missing. + Map<String, Long> invalidWithMaxKeyMissing = new HashMap<String, Long>() { + { + put("min_version", 1L); + } + }; + assertThrows( + IllegalArgumentException.class, + () -> VersionRange.deserialize(invalidWithMaxKeyMissing)); + } + + @Test + public void testToString() { + assertEquals("VersionRange[1, 1]", new VersionRange(1, 1).toString()); + assertEquals("VersionRange[1, 2]", new VersionRange(1, 2).toString()); + } + + @Test + public void testEquals() { + assertTrue(new VersionRange(1, 1).equals(new VersionRange(1, 1))); + assertFalse(new VersionRange(1, 1).equals(new VersionRange(1, 2))); + } + + @Test + public void testGetters() { Review comment: Done. ########## File path: core/src/main/scala/kafka/server/SupportedFeatures.scala ########## @@ -0,0 +1,70 @@ +package kafka.server + +import kafka.utils.Logging +import org.apache.kafka.common.feature.{Features, VersionRange, VersionLevelRange} +import org.apache.kafka.common.feature.Features._ + +import scala.jdk.CollectionConverters._ + +/** + * A common object used in the Broker to define the latest features supported by the Broker. + * Also provides API to check for incompatibilities between the latest features supported by the + * Broker and cluster-wide finalized features. + */ +object SupportedFeatures extends Logging { + /** + * This is the latest features supported by the Broker. + * This is currently empty, but in the future as we define supported features, this map should be + * populated. + */ + @volatile private var supportedFeatures = emptySupportedFeatures + + /** + * Returns the latest features supported by the Broker. + */ + def get: Features[VersionRange] = { + supportedFeatures + } + + // Should be used only for testing. + def update(newFeatures: Features[VersionRange]): Unit = { + supportedFeatures = newFeatures + } + + // Should be used only for testing. + def clear(): Unit = { + supportedFeatures = emptySupportedFeatures + } + + /** + * Returns the set of feature names found to be incompatible between the latest features supported + * by the Broker, and the provided cluster-wide finalized features. + * + * @param finalized The finalized features against which incompatibilities need to be checked for. + * + * @return The set of incompatible feature names. If the returned set is empty, it + * means there were no feature incompatibilities found. + */ + def incompatibleFeatures(finalized: Features[VersionLevelRange]): Set[String] = { + val supported = get + + val incompatibilities = finalized.all.asScala.collect { + case (feature, versionLevels) => { + val supportedVersions = supported.get(feature); + if (supportedVersions == null) { + (feature, "{feature=%s, reason='Unsupported feature'}".format(feature)) + } else if (!versionLevels.isCompatibleWith(supportedVersions)) { Review comment: Done. ########## File path: clients/src/test/java/org/apache/kafka/common/feature/VersionLevelRangeTest.java ########## @@ -0,0 +1,162 @@ +package org.apache.kafka.common.feature; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class VersionLevelRangeTest { + + @Test + public void testCreateFailDueToInvalidParams() { + // min and max can't be < 1. + assertThrows( + IllegalArgumentException.class, + () -> new VersionLevelRange(0, 0)); + assertThrows( + IllegalArgumentException.class, + () -> new VersionLevelRange(-1, -1)); + // min can't be < 1. + assertThrows( + IllegalArgumentException.class, + () -> new VersionLevelRange(0, 1)); + assertThrows( + IllegalArgumentException.class, + () -> new VersionLevelRange(-1, 1)); + // max can't be < 1. + assertThrows( + IllegalArgumentException.class, + () -> new VersionLevelRange(1, 0)); + assertThrows( + IllegalArgumentException.class, + () -> new VersionLevelRange(1, -1)); + // min can't be > max. + assertThrows( + IllegalArgumentException.class, + () -> new VersionLevelRange(2, 1)); + } + + @Test + public void testSerializeDeserialize() { + VersionLevelRange versionLevelRange = new VersionLevelRange(1, 2); + assertEquals(1, versionLevelRange.min()); + assertEquals(2, versionLevelRange.max()); + + Map<String, Long> serialized = versionLevelRange.serialize(); + assertEquals( + new HashMap<String, Long>() { + { + put("min_version_level", versionLevelRange.min()); + put("max_version_level", versionLevelRange.max()); + } + }, + serialized + ); + + VersionLevelRange deserialized = VersionLevelRange.deserialize(serialized); + assertEquals(1, deserialized.min()); + assertEquals(2, deserialized.max()); + assertEquals(versionLevelRange, deserialized); + } + + @Test + public void testDeserializationFailureTest() { + // min_version_level can't be < 1. + Map<String, Long> invalidWithBadMinVersion = new HashMap<String, Long>() { + { + put("min_version_level", 0L); + put("max_version_level", 1L); + } + }; + assertThrows( + IllegalArgumentException.class, + () -> VersionLevelRange.deserialize(invalidWithBadMinVersion)); + + // max_version_level can't be < 1. + Map<String, Long> invalidWithBadMaxVersion = new HashMap<String, Long>() { + { + put("min_version_level", 1L); + put("max_version_level", 0L); + } + }; + assertThrows( + IllegalArgumentException.class, + () -> VersionLevelRange.deserialize(invalidWithBadMaxVersion)); + + // min_version_level and max_version_level can't be < 1. + Map<String, Long> invalidWithBadMinMaxVersion = new HashMap<String, Long>() { + { + put("min_version_level", 0L); + put("max_version_level", 0L); + } + }; + assertThrows( + IllegalArgumentException.class, + () -> VersionLevelRange.deserialize(invalidWithBadMinMaxVersion)); + + // min_version_level can't be > max_version_level. + Map<String, Long> invalidWithLowerMaxVersion = new HashMap<String, Long>() { + { + put("min_version_level", 2L); + put("max_version_level", 1L); + } + }; + assertThrows( + IllegalArgumentException.class, + () -> VersionLevelRange.deserialize(invalidWithLowerMaxVersion)); + + // min_version_level key missing. + Map<String, Long> invalidWithMinKeyMissing = new HashMap<String, Long>() { + { + put("max_version_level", 1L); + } + }; + assertThrows( + IllegalArgumentException.class, + () -> VersionLevelRange.deserialize(invalidWithMinKeyMissing)); + + // max_version_level key missing. + Map<String, Long> invalidWithMaxKeyMissing = new HashMap<String, Long>() { + { + put("min_version_level", 1L); + } + }; + assertThrows( + IllegalArgumentException.class, + () -> VersionLevelRange.deserialize(invalidWithMaxKeyMissing)); + } + + @Test + public void testToString() { + assertEquals("VersionLevelRange[1, 1]", new VersionLevelRange(1, 1).toString()); + assertEquals("VersionLevelRange[1, 2]", new VersionLevelRange(1, 2).toString()); + } + + @Test + public void testEquals() { + assertTrue(new VersionLevelRange(1, 1).equals(new VersionLevelRange(1, 1))); + assertFalse(new VersionLevelRange(1, 1).equals(new VersionLevelRange(1, 2))); + } + + @Test + public void testIsCompatibleWith() { + assertTrue(new VersionLevelRange(1, 1).isCompatibleWith(new VersionRange(1, 1))); + assertTrue(new VersionLevelRange(2, 3).isCompatibleWith(new VersionRange(1, 4))); + assertTrue(new VersionLevelRange(1, 4).isCompatibleWith(new VersionRange(1, 4))); + + assertFalse(new VersionLevelRange(1, 4).isCompatibleWith(new VersionRange(2, 3))); + assertFalse(new VersionLevelRange(1, 4).isCompatibleWith(new VersionRange(2, 4))); + assertFalse(new VersionLevelRange(2, 4).isCompatibleWith(new VersionRange(2, 3))); + } + + @Test + public void testGetters() { + assertEquals(1, new VersionLevelRange(1, 2).min()); + assertEquals(2, new VersionLevelRange(1, 2).max()); + } +} Review comment: Done. ########## File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala ########## @@ -0,0 +1,93 @@ +package kafka.server + +import kafka.utils.Logging +import org.apache.kafka.common.feature.{Features, VersionLevelRange} + +// Raised whenever there was an error in updating the FinalizedFeatureCache with features. +class FeatureCacheUpdateException(message: String) extends RuntimeException(message) { +} + +// Helper class that represents finalized features along with an epoch value. +case class FinalizedFeaturesAndEpoch(features: Features[VersionLevelRange], epoch: Int) { + + def isValid(newEpoch: Int): Boolean = { + newEpoch >= epoch + } + + override def toString(): String = { + "FinalizedFeaturesAndEpoch(features=%s, epoch=%s)".format(features, epoch) + } +} + +/** + * A mutable cache containing the latest finalized features and epoch. This cache is populated by a + * FinalizedFeatureChangeListener. + * + * Currently the main reader of this cache is the read path that serves an ApiVersionsRequest + * returning the features information in the response. In the future, as the feature versioning + * system in KIP-584 is used more widely, this cache could be read by other read paths trying to + * learn the finalized feature information. + */ +object FinalizedFeatureCache extends Logging { + @volatile private var featuresAndEpoch: Option[FinalizedFeaturesAndEpoch] = Option.empty + + /** + * @return the latest known FinalizedFeaturesAndEpoch. If the returned value is empty, it means + * no FinalizedFeaturesAndEpoch exists in the cache at the time when this + * method is invoked. This result could change in the future whenever the + * updateOrThrow method is invoked. + */ + def get: Option[FinalizedFeaturesAndEpoch] = { + featuresAndEpoch + } + + def empty: Boolean = { + featuresAndEpoch.isEmpty + } + + /** + * Clears all existing finalized features and epoch from the cache. + */ + def clear(): Unit = { + featuresAndEpoch = Option.empty + info("Cleared cache") + } + + /** + * Updates the cache to the latestFeatures, and updates the existing epoch to latestEpoch. + * Raises an exception when the operation is not successful. + * + * @param latestFeatures the latest finalized features to be set in the cache + * @param latestEpoch the latest epoch value to be set in the cache + * + * @throws FeatureCacheUpdateException if the cache update operation fails + * due to invalid parameters or incompatibilities with the broker's + * supported features. In such a case, the existing cache contents are + * not modified. + */ + def updateOrThrow(latestFeatures: Features[VersionLevelRange], latestEpoch: Int): Unit = { + updateOrThrow(FinalizedFeaturesAndEpoch(latestFeatures, latestEpoch)) + } + + private def updateOrThrow(latest: FinalizedFeaturesAndEpoch): Unit = { + val existingStr = featuresAndEpoch.map(existing => existing.toString).getOrElse("<empty>") Review comment: Done. ########## File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala ########## @@ -0,0 +1,93 @@ +package kafka.server + +import kafka.utils.Logging +import org.apache.kafka.common.feature.{Features, VersionLevelRange} + +// Raised whenever there was an error in updating the FinalizedFeatureCache with features. +class FeatureCacheUpdateException(message: String) extends RuntimeException(message) { +} + +// Helper class that represents finalized features along with an epoch value. +case class FinalizedFeaturesAndEpoch(features: Features[VersionLevelRange], epoch: Int) { + + def isValid(newEpoch: Int): Boolean = { + newEpoch >= epoch + } + + override def toString(): String = { + "FinalizedFeaturesAndEpoch(features=%s, epoch=%s)".format(features, epoch) + } +} + +/** + * A mutable cache containing the latest finalized features and epoch. This cache is populated by a + * FinalizedFeatureChangeListener. Review comment: Done. ########## File path: core/src/main/scala/kafka/zk/ZkData.scala ########## @@ -81,17 +83,27 @@ object BrokerIdsZNode { object BrokerInfo { /** - * Create a broker info with v4 json format (which includes multiple endpoints and rack) if - * the apiVersion is 0.10.0.X or above. Register the broker with v2 json format otherwise. + * - Create a broker info with v5 json format if the apiVersion is 2.6.x or above. + * - Create a broker info with v4 json format (which includes multiple endpoints and rack) if + * the apiVersion is 0.10.0.X or above but lesser than 2.6.x. + * - Register the broker with v2 json format otherwise. * * Due to KAFKA-3100, 0.9.0.0 broker and old clients will break if JSON version is above 2. * - * We include v2 to make it possible for the broker to migrate from 0.9.0.0 to 0.10.0.X or above without having to - * upgrade to 0.9.0.1 first (clients have to be upgraded to 0.9.0.1 in any case). + * We include v2 to make it possible for the broker to migrate from 0.9.0.0 to 0.10.0.X or above + * without having to upgrade to 0.9.0.1 first (clients have to be upgraded to 0.9.0.1 in + * any case). */ def apply(broker: Broker, apiVersion: ApiVersion, jmxPort: Int): BrokerInfo = { - // see method documentation for the reason why we do this - val version = if (apiVersion >= KAFKA_0_10_0_IV1) 4 else 2 + val version = { Review comment: Done. Good point! ########## File path: core/src/main/scala/kafka/server/SupportedFeatures.scala ########## @@ -0,0 +1,70 @@ +package kafka.server + +import kafka.utils.Logging +import org.apache.kafka.common.feature.{Features, VersionRange, VersionLevelRange} +import org.apache.kafka.common.feature.Features._ + +import scala.jdk.CollectionConverters._ + +/** + * A common object used in the Broker to define the latest features supported by the Broker. + * Also provides API to check for incompatibilities between the latest features supported by the + * Broker and cluster-wide finalized features. + */ +object SupportedFeatures extends Logging { + /** + * This is the latest features supported by the Broker. + * This is currently empty, but in the future as we define supported features, this map should be + * populated. + */ + @volatile private var supportedFeatures = emptySupportedFeatures + + /** + * Returns the latest features supported by the Broker. + */ + def get: Features[VersionRange] = { + supportedFeatures + } + + // Should be used only for testing. + def update(newFeatures: Features[VersionRange]): Unit = { + supportedFeatures = newFeatures + } + + // Should be used only for testing. + def clear(): Unit = { + supportedFeatures = emptySupportedFeatures + } + + /** + * Returns the set of feature names found to be incompatible between the latest features supported + * by the Broker, and the provided cluster-wide finalized features. + * + * @param finalized The finalized features against which incompatibilities need to be checked for. + * + * @return The set of incompatible feature names. If the returned set is empty, it + * means there were no feature incompatibilities found. + */ + def incompatibleFeatures(finalized: Features[VersionLevelRange]): Set[String] = { + val supported = get Review comment: Done. ########## File path: core/src/main/scala/kafka/server/SupportedFeatures.scala ########## @@ -0,0 +1,70 @@ +package kafka.server + +import kafka.utils.Logging +import org.apache.kafka.common.feature.{Features, VersionRange, VersionLevelRange} +import org.apache.kafka.common.feature.Features._ + +import scala.jdk.CollectionConverters._ + +/** + * A common object used in the Broker to define the latest features supported by the Broker. + * Also provides API to check for incompatibilities between the latest features supported by the + * Broker and cluster-wide finalized features. + */ +object SupportedFeatures extends Logging { Review comment: Done. ########## File path: core/src/main/scala/kafka/server/SupportedFeatures.scala ########## @@ -0,0 +1,70 @@ +package kafka.server + +import kafka.utils.Logging +import org.apache.kafka.common.feature.{Features, VersionRange, VersionLevelRange} +import org.apache.kafka.common.feature.Features._ + +import scala.jdk.CollectionConverters._ + +/** + * A common object used in the Broker to define the latest features supported by the Broker. + * Also provides API to check for incompatibilities between the latest features supported by the + * Broker and cluster-wide finalized features. + */ +object SupportedFeatures extends Logging { + /** + * This is the latest features supported by the Broker. + * This is currently empty, but in the future as we define supported features, this map should be + * populated. + */ + @volatile private var supportedFeatures = emptySupportedFeatures + + /** + * Returns the latest features supported by the Broker. + */ + def get: Features[VersionRange] = { + supportedFeatures + } + + // Should be used only for testing. + def update(newFeatures: Features[VersionRange]): Unit = { + supportedFeatures = newFeatures + } + + // Should be used only for testing. + def clear(): Unit = { + supportedFeatures = emptySupportedFeatures + } + + /** + * Returns the set of feature names found to be incompatible between the latest features supported + * by the Broker, and the provided cluster-wide finalized features. + * + * @param finalized The finalized features against which incompatibilities need to be checked for. Review comment: Good point. I have improved the doc now. Let me know how you feel about it. ########## File path: core/src/main/scala/kafka/zk/KafkaZkClient.scala ########## @@ -1567,6 +1567,36 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo createRecursive(path, data = null, throwIfPathExists = false) } + // Visible for testing. + def createFeatureZNode(nodeContents: FeatureZNode): Unit = { Review comment: Yes, this will get used in the future. For example the write path will use it. ########## File path: core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala ########## @@ -149,6 +153,53 @@ class BrokerEndPointTest { assertEquals(None, broker.rack) } + @Test + def testFromJsonV5(): Unit = { Review comment: In my understanding, this is an impossible case. Because, we always write features into the JSON only in v5 or above. That is why, there is no test for it. Let me know how you feel about it. ########## File path: core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala ########## @@ -149,6 +153,53 @@ class BrokerEndPointTest { assertEquals(None, broker.rack) } + @Test + def testFromJsonV5(): Unit = { + val json = """{ + "version":5, + "host":"localhost", + "port":9092, + "jmx_port":9999, + "timestamp":"2233345666", + "endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"], + "listener_security_protocol_map":{"CLIENT":"SSL", "REPLICATION":"PLAINTEXT"}, + "rack":"dc1", + "features": {"feature1": {"min_version": 1, "max_version": 2}, "feature2": {"min_version": 2, "max_version": 4}} + }""" + val broker = parseBrokerJson(1, json) + assertEquals(1, broker.id) + val brokerEndPoint = broker.brokerEndPoint(new ListenerName("CLIENT")) + assertEquals("host1", brokerEndPoint.host) + assertEquals(9092, brokerEndPoint.port) + assertEquals(Some("dc1"), broker.rack) + assertEquals(Features.supportedFeatures( + Map[String, VersionRange]( + "feature1" -> new VersionRange(1, 2), + "feature2" -> new VersionRange(2, 4)).asJava), + broker.features) + } + + @Test + def testFromJsonV4WithNoFeatures(): Unit = { Review comment: Done. ########## File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala ########## @@ -0,0 +1,93 @@ +package kafka.server + +import kafka.utils.Logging +import org.apache.kafka.common.feature.{Features, VersionLevelRange} + +// Raised whenever there was an error in updating the FinalizedFeatureCache with features. +class FeatureCacheUpdateException(message: String) extends RuntimeException(message) { +} + +// Helper class that represents finalized features along with an epoch value. +case class FinalizedFeaturesAndEpoch(features: Features[VersionLevelRange], epoch: Int) { + + def isValid(newEpoch: Int): Boolean = { + newEpoch >= epoch + } + + override def toString(): String = { + "FinalizedFeaturesAndEpoch(features=%s, epoch=%s)".format(features, epoch) + } +} + +/** + * A mutable cache containing the latest finalized features and epoch. This cache is populated by a + * FinalizedFeatureChangeListener. + * + * Currently the main reader of this cache is the read path that serves an ApiVersionsRequest + * returning the features information in the response. In the future, as the feature versioning + * system in KIP-584 is used more widely, this cache could be read by other read paths trying to + * learn the finalized feature information. + */ +object FinalizedFeatureCache extends Logging { + @volatile private var featuresAndEpoch: Option[FinalizedFeaturesAndEpoch] = Option.empty + + /** + * @return the latest known FinalizedFeaturesAndEpoch. If the returned value is empty, it means + * no FinalizedFeaturesAndEpoch exists in the cache at the time when this + * method is invoked. This result could change in the future whenever the + * updateOrThrow method is invoked. + */ + def get: Option[FinalizedFeaturesAndEpoch] = { + featuresAndEpoch + } + + def empty: Boolean = { + featuresAndEpoch.isEmpty + } + + /** + * Clears all existing finalized features and epoch from the cache. + */ + def clear(): Unit = { + featuresAndEpoch = Option.empty + info("Cleared cache") + } + + /** + * Updates the cache to the latestFeatures, and updates the existing epoch to latestEpoch. + * Raises an exception when the operation is not successful. + * + * @param latestFeatures the latest finalized features to be set in the cache + * @param latestEpoch the latest epoch value to be set in the cache + * + * @throws FeatureCacheUpdateException if the cache update operation fails + * due to invalid parameters or incompatibilities with the broker's + * supported features. In such a case, the existing cache contents are + * not modified. + */ + def updateOrThrow(latestFeatures: Features[VersionLevelRange], latestEpoch: Int): Unit = { + updateOrThrow(FinalizedFeaturesAndEpoch(latestFeatures, latestEpoch)) + } + + private def updateOrThrow(latest: FinalizedFeaturesAndEpoch): Unit = { + val existingStr = featuresAndEpoch.map(existing => existing.toString).getOrElse("<empty>") + if (!featuresAndEpoch.isEmpty && featuresAndEpoch.get.epoch > latest.epoch) { + val errorMsg = ("FinalizedFeatureCache update failed due to invalid epoch in new finalized %s." + + " The existing finalized is %s").format(latest, existingStr) + throw new FeatureCacheUpdateException(errorMsg) + } else { + val incompatibleFeatures = SupportedFeatures.incompatibleFeatures(latest.features) + if (incompatibleFeatures.nonEmpty) { + val errorMsg = ("FinalizedFeatureCache updated failed since feature compatibility" + + " checks failed! Supported %s has incompatibilities with the latest finalized %s." + + " The incompatible features are: %s.").format( + SupportedFeatures.get, latest, incompatibleFeatures) + throw new FeatureCacheUpdateException(errorMsg) + } + } + val logMsg = "Updated cache from existing finalized %s to latest finalized %s".format( Review comment: It is used intentionally to split the log message into 2 lines (for ~100-char readability limit per line). Otherwise the string will be huge and all in the same line. ########## File path: core/src/main/scala/kafka/zk/ZkData.scala ########## @@ -225,7 +255,12 @@ object BrokerIdZNode { } val rack = brokerInfo.get(RackKey).flatMap(_.to[Option[String]]) - BrokerInfo(Broker(id, endpoints, rack), version, jmxPort) + val features = FeatureZNode.asJavaMap(brokerInfo Review comment: Done. ########## File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala ########## @@ -0,0 +1,200 @@ +package kafka.server + +import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit} + +import kafka.utils.{Logging, ShutdownableThread} +import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion} +import kafka.zookeeper.ZNodeChangeHandler +import org.apache.kafka.common.internals.FatalExitError + +import scala.concurrent.TimeoutException + +/** + * Listens to changes in the ZK feature node, via the ZK client. Whenever a change notification + * is received from ZK, the feature cache in FinalizedFeatureCache is asynchronously updated + * to the latest features read from ZK. The cache updates are serialized through a single + * notification processor thread. + * + * @param zkClient the Zookeeper client + */ +class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging { + + /** + * Helper class used to update the FinalizedFeatureCache. + * + * @param featureZkNodePath the path to the ZK feature node to be read + * @param maybeNotifyOnce an optional latch that can be used to propvide notification + * when an update operation is over + */ + private class FeatureCacheUpdater(featureZkNodePath: String, maybeNotifyOnce: Option[CountDownLatch]) { + + def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty) + + /** + * Updates the feature cache in FinalizedFeatureCache with the latest features read from the + * ZK node in featureZkNodePath. If the cache update is not successful, then, a suitable + * exception is raised. + * + * NOTE: if a notifier was provided in the constructor, then, this method can be invoked + * only exactly once successfully. + */ + def updateLatestOrThrow(): Unit = { + maybeNotifyOnce.foreach(notifier => { + if (notifier.getCount != 1) { + throw new IllegalStateException( + "Can not notify after updateLatestOrThrow was called more than once successfully.") + } + }) + + info(s"Reading feature ZK node at path: $featureZkNodePath") + val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(featureZkNodePath) + if (version == ZkVersion.UnknownVersion) { + info(s"Feature ZK node at path: $featureZkNodePath does not exist") + FinalizedFeatureCache.clear() + } else { + val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get) + if (featureZNode.status == FeatureZNodeStatus.Disabled) { + info(s"Feature ZK node at path: $featureZkNodePath is in disabled status") + FinalizedFeatureCache.clear() + } else if(featureZNode.status == FeatureZNodeStatus.Enabled) { + FinalizedFeatureCache.updateOrThrow(featureZNode.features, version) + } else { + throw new IllegalStateException(s"Unexpected FeatureZNodeStatus found in $featureZNode") + } + } + + maybeNotifyOnce.foreach(notifier => notifier.countDown()) + } + + /** + * Waits until at least a single updateLatestOrThrow completes successfully. + * NOTE: The method returns immediately if an updateLatestOrThrow call has already completed + * successfully. + * + * @param waitTimeMs the timeout for the wait operation + * + * @throws - RuntimeException if the thread was interrupted during wait + * - TimeoutException if the wait can not be completed in waitTimeMs + * milli seconds + */ + def awaitUpdateOrThrow(waitTimeMs: Long): Unit = { + maybeNotifyOnce.foreach(notifier => { + var success = false + try { + success = notifier.await(waitTimeMs, TimeUnit.MILLISECONDS) + } catch { + case e: InterruptedException => + throw new RuntimeException( + "Unable to wait for FinalizedFeatureCache update to finish.", e) + } + + if (!success) { + throw new TimeoutException( + s"Timed out after waiting for ${waitTimeMs}ms for FeatureCache to be updated.") + } + }) + } + } + + /** + * A shutdownable thread to process feature node change notifications that are populated into the + * queue. If any change notification can not be processed successfully (unless it is due to an + * interrupt), the thread treats it as a fatal event and triggers Broker exit. + * + * @param name name of the thread + */ + private class ChangeNotificationProcessorThread(name: String) extends ShutdownableThread(name = name) { + override def doWork(): Unit = { + try { + queue.take.updateLatestOrThrow() + } catch { + case e: InterruptedException => info(s"Interrupted", e) + case e: Exception => { + error("Failed to process feature ZK node change event. The broker will exit.", e) + throw new FatalExitError(1) + } + } + } + } + + // Feature ZK node change handler. + object FeatureZNodeChangeHandler extends ZNodeChangeHandler { + override val path: String = FeatureZNode.path + + private def processNotification(): Unit = { + queue.add(new FeatureCacheUpdater(path)) + } + + override def handleCreation(): Unit = { + info(s"Feature ZK node created at path: $path") + processNotification() + } + + override def handleDataChange(): Unit = { + info(s"Feature ZK node updated at path: $path") + processNotification() + } + + override def handleDeletion(): Unit = { + warn(s"Feature ZK node deleted at path: $path") + processNotification() Review comment: I have added comments now to the code. The idea I had was that this event may happen, rarely (ex: operational error). In such a case, we do not want to kill the brokers, so we just log a warning and treat the case as if the node is absent, and populate the cache with empty features. So, this case is actually handled inside `FeatureCacheUpdater.updateLatestOrThrow()`. The call to read ZK node will return `ZkVersion.UnknownVersion` whenever the node does not exist in ZK, and I've explicitly handled this returned version. ########## File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala ########## @@ -0,0 +1,93 @@ +package kafka.server + +import kafka.utils.Logging +import org.apache.kafka.common.feature.{Features, VersionLevelRange} + +// Raised whenever there was an error in updating the FinalizedFeatureCache with features. Review comment: As we discussed over slack today, this exception is already handled in `ChangeNotificationProcessorThread.doWork()` method defined in `FinalizedFeatureChangeListener.scala`. Basically, the ZK change notification processor thread exits the Broker with a fatal error (non-zero exit code) when this exception (or any exception) is caught while trying to update `FinalizedFeatureCache`. ########## File path: core/src/main/scala/kafka/server/KafkaServer.scala ########## @@ -210,6 +215,14 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP /* setup zookeeper */ initZkClient(time) + /* initialize features */ + _featureChangeListener = new FinalizedFeatureChangeListener(_zkClient) + if (config.interBrokerProtocolVersion >= KAFKA_2_6_IV1) { + // The feature versioning system (KIP-584) is active only when: + // config.interBrokerProtocolVersion is >= KAFKA_2_6_IV1. + _featureChangeListener.initOrThrow(60000) Review comment: Done. ########## File path: clients/src/main/java/org/apache/kafka/common/feature/VersionLevelRange.java ########## @@ -0,0 +1,39 @@ +package org.apache.kafka.common.feature; + +import java.util.Map; + +/** + * A specialization of VersionRange representing a range of version levels. The main specialization + * is that the class uses different serialization keys for min/max attributes. + * + * NOTE: This is the backing class used to define the min/max version levels for finalized features. + */ +public class VersionLevelRange extends VersionRange { Review comment: Done. Good point! - I have now created 3 classes as you proposed. `BaseVersionRange` is the base class, and, `SupportedVersionRange` & `FinalizedVersionRange` are it's child classes. - The key labels couldn't be made into abstract functions since these constants are needed within `deserialize()` which is a static method defined in the child classes. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org