artemlivshits commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1609158284
##########
core/src/main/scala/kafka/server/BrokerFeatures.scala:
##########
@@ -75,16 +75,19 @@ object BrokerFeatures extends Logging {
}
def defaultSupportedFeatures(unstableMetadataVersionsEnabled: Boolean):
Features[SupportedVersionRange] = {
- Features.supportedFeatures(
- java.util.Collections.singletonMap(MetadataVersion.FEATURE_NAME,
+ val features = new util.HashMap[String, SupportedVersionRange]()
+ features.put(MetadataVersion.FEATURE_NAME,
new SupportedVersionRange(
MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
if (unstableMetadataVersionsEnabled) {
MetadataVersion.latestTesting.featureLevel
} else {
MetadataVersion.latestProduction.featureLevel
- }
- )))
+ }))
+ org.apache.kafka.server.common.Features.PRODUCTION_FEATURES.forEach {
feature =>
Review Comment:
Any reasons this is a fully qualified class name and not just imported as
it's usually done?
##########
server-common/src/main/java/org/apache/kafka/server/common/Features.java:
##########
@@ -16,72 +16,135 @@
*/
package org.apache.kafka.server.common;
-import java.util.Collections;
-import java.util.HashMap;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
-import java.util.Objects;
+import java.util.stream.Collectors;
-import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
+/**
+ * This is enum for the various features implemented for Kafka clusters.
+ * KIP-584: Versioning Scheme for Features introduced the idea of various
features, but only added one feature -- MetadataVersion.
+ * KIP-1022: Formatting and Updating Features allowed for more features to be
added. In order to set and update features,
+ * they need to be specified via the StorageTool or FeatureCommand tools.
+ * <br>
+ * Having a unified enum for the features that will use a shared type in the
API used to set and update them
+ * makes it easier to process these features.
+ */
+public enum Features {
+
+ /**
+ * Features defined. If a feature is included in this list, and marked to
be used in production they will also be specified when
+ * formatting a cluster via the StorageTool. MetadataVersion is handled
separately, so it is not included here.
+ *
+ * See {@link TestFeatureVersion} as an example. See {@link
FeatureVersion} when implementing a new feature.
+ */
+ TEST_VERSION("test.feature.version", TestFeatureVersion.values(),
TestFeatureVersion::fromFeatureLevel, false);
-public final class Features {
- private final MetadataVersion version;
- private final Map<String, Short> finalizedFeatures;
- private final long finalizedFeaturesEpoch;
+ public static final Features[] FEATURES;
+ public static final List<Features> PRODUCTION_FEATURES;
+ private final String name;
+ private final FeatureVersion[] features;
+ private final CreateMethod createFeatureVersionMethod;
+ private final boolean usedInProduction;
- public static Features fromKRaftVersion(MetadataVersion version) {
- return new Features(version, Collections.emptyMap(), -1, true);
+ Features(String name,
+ FeatureVersion[] features,
+ CreateMethod createMethod,
+ boolean usedInProduction) {
+ this.name = name;
+ this.features = features;
+ this.createFeatureVersionMethod = createMethod;
+ this.usedInProduction = usedInProduction;
}
- public Features(
- MetadataVersion version,
- Map<String, Short> finalizedFeatures,
- long finalizedFeaturesEpoch,
- boolean kraftMode
- ) {
- this.version = version;
- this.finalizedFeatures = new HashMap<>(finalizedFeatures);
- this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
- // In KRaft mode, we always include the metadata version in the
features map.
- // In ZK mode, we never include it.
- if (kraftMode) {
- this.finalizedFeatures.put(FEATURE_NAME, version.featureLevel());
- } else {
- this.finalizedFeatures.remove(FEATURE_NAME);
- }
+ static {
+ Features[] enumValues = Features.values();
+ FEATURES = Arrays.copyOf(enumValues, enumValues.length);
+
+ PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature ->
+ feature.usedInProduction).collect(Collectors.toList());
}
- public MetadataVersion metadataVersion() {
- return version;
+ public String featureName() {
+ return name;
}
- public Map<String, Short> finalizedFeatures() {
- return finalizedFeatures;
+ public FeatureVersion[] features() {
+ return features;
}
- public long finalizedFeaturesEpoch() {
- return finalizedFeaturesEpoch;
+ /**
+ * Creates a FeatureVersion from a given name and level with the correct
feature object underneath.
+ *
+ * @param level the level of the feature
+ * @returns the FeatureVersionUtils.FeatureVersion for the feature
the enum is based on.
+ * @throws IllegalArgumentException if the feature name is not
valid (not implemented for this method)
+ */
+ public FeatureVersion fromFeatureLevel(short level) {
+ return createFeatureVersionMethod.fromFeatureLevel(level);
}
- @Override
- public boolean equals(Object o) {
- if (o == null || !(o.getClass().equals(Features.class))) return false;
- Features other = (Features) o;
- return version == other.version &&
- finalizedFeatures.equals(other.finalizedFeatures) &&
- finalizedFeaturesEpoch == other.finalizedFeaturesEpoch;
+ /**
+ * A method to validate the feature can be set. If a given feature relies
on another feature, the dependencies should be
+ * captured in {@link FeatureVersion#dependencies()}
+ * <p>
+ * For example, say feature X level x relies on feature Y level y:
+ * if feature X >= x then throw an error if feature Y < y.
+ *
+ * All feature levels above 0 require metadata.version=4 (IBP_3_3_IV0) in
order to write the feature records to the cluster.
+ *
+ * @param feature the feature we are validating
+ * @param metadataVersion the metadata version we have (or want
to set)
+ * @param features the feature versions (besides
MetadataVersion) we have (or want to set)
+ * @throws IllegalArgumentException if the feature is not valid
+ */
+ public static void validateVersion(FeatureVersion feature, MetadataVersion
metadataVersion, Map<String, Short> features) {
+ if (feature.featureLevel() >= 1 &&
metadataVersion.isLessThan(MetadataVersion.IBP_3_3_IV0))
+ throw new IllegalArgumentException(feature.featureName() + " could
not be set to " + feature.featureLevel() +
+ " because it depends on metadata.version=14 (" +
MetadataVersion.IBP_3_3_IV0 + ")");
Review Comment:
Should it be `metadata.version=4`?
##########
server-common/src/main/java/org/apache/kafka/server/common/Features.java:
##########
@@ -16,72 +16,135 @@
*/
package org.apache.kafka.server.common;
-import java.util.Collections;
-import java.util.HashMap;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
-import java.util.Objects;
+import java.util.stream.Collectors;
-import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
+/**
+ * This is enum for the various features implemented for Kafka clusters.
+ * KIP-584: Versioning Scheme for Features introduced the idea of various
features, but only added one feature -- MetadataVersion.
+ * KIP-1022: Formatting and Updating Features allowed for more features to be
added. In order to set and update features,
+ * they need to be specified via the StorageTool or FeatureCommand tools.
+ * <br>
+ * Having a unified enum for the features that will use a shared type in the
API used to set and update them
+ * makes it easier to process these features.
+ */
+public enum Features {
+
+ /**
+ * Features defined. If a feature is included in this list, and marked to
be used in production they will also be specified when
+ * formatting a cluster via the StorageTool. MetadataVersion is handled
separately, so it is not included here.
+ *
+ * See {@link TestFeatureVersion} as an example. See {@link
FeatureVersion} when implementing a new feature.
+ */
+ TEST_VERSION("test.feature.version", TestFeatureVersion.values(),
TestFeatureVersion::fromFeatureLevel, false);
-public final class Features {
- private final MetadataVersion version;
- private final Map<String, Short> finalizedFeatures;
- private final long finalizedFeaturesEpoch;
+ public static final Features[] FEATURES;
+ public static final List<Features> PRODUCTION_FEATURES;
+ private final String name;
+ private final FeatureVersion[] features;
+ private final CreateMethod createFeatureVersionMethod;
+ private final boolean usedInProduction;
- public static Features fromKRaftVersion(MetadataVersion version) {
- return new Features(version, Collections.emptyMap(), -1, true);
+ Features(String name,
+ FeatureVersion[] features,
+ CreateMethod createMethod,
+ boolean usedInProduction) {
+ this.name = name;
+ this.features = features;
+ this.createFeatureVersionMethod = createMethod;
+ this.usedInProduction = usedInProduction;
}
- public Features(
- MetadataVersion version,
- Map<String, Short> finalizedFeatures,
- long finalizedFeaturesEpoch,
- boolean kraftMode
- ) {
- this.version = version;
- this.finalizedFeatures = new HashMap<>(finalizedFeatures);
- this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
- // In KRaft mode, we always include the metadata version in the
features map.
- // In ZK mode, we never include it.
- if (kraftMode) {
- this.finalizedFeatures.put(FEATURE_NAME, version.featureLevel());
- } else {
- this.finalizedFeatures.remove(FEATURE_NAME);
- }
+ static {
+ Features[] enumValues = Features.values();
+ FEATURES = Arrays.copyOf(enumValues, enumValues.length);
+
+ PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature ->
+ feature.usedInProduction).collect(Collectors.toList());
}
- public MetadataVersion metadataVersion() {
- return version;
+ public String featureName() {
+ return name;
}
- public Map<String, Short> finalizedFeatures() {
- return finalizedFeatures;
+ public FeatureVersion[] features() {
+ return features;
}
- public long finalizedFeaturesEpoch() {
- return finalizedFeaturesEpoch;
+ /**
+ * Creates a FeatureVersion from a given name and level with the correct
feature object underneath.
+ *
+ * @param level the level of the feature
+ * @returns the FeatureVersionUtils.FeatureVersion for the feature
the enum is based on.
+ * @throws IllegalArgumentException if the feature name is not
valid (not implemented for this method)
+ */
+ public FeatureVersion fromFeatureLevel(short level) {
+ return createFeatureVersionMethod.fromFeatureLevel(level);
Review Comment:
Can we just iterate over `features` here and if `features.featureLevel() ==
level` then return, if not found throw?
##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -459,18 +459,20 @@ BrokerFeature processRegistrationFeature(
FinalizedControllerFeatures finalizedFeatures,
BrokerRegistrationRequestData.Feature feature
) {
- Optional<Short> finalized = finalizedFeatures.get(feature.name());
- if (finalized.isPresent()) {
- if (!VersionRange.of(feature.minSupportedVersion(),
feature.maxSupportedVersion()).contains(finalized.get())) {
- throw new UnsupportedVersionException("Unable to register
because the broker " +
- "does not support version " + finalized.get() + " of " +
feature.name() +
- ". It wants a version between " +
feature.minSupportedVersion() + " and " +
- feature.maxSupportedVersion() + ", inclusive.");
- }
- } else {
- log.warn("Broker {} registered with feature {} that is unknown to
the controller",
+ int defaultVersion =
feature.name().equals(MetadataVersion.FEATURE_NAME) ? 1 : 0; // The default
value for MetadataVersion is 1 not 0.
+ short finalized = finalizedFeatures.getOrDefault(feature.name(),
(short) defaultVersion);
+ if (!VersionRange.of(feature.minSupportedVersion(),
feature.maxSupportedVersion()).contains(finalized)) {
+ throw new UnsupportedVersionException("Unable to register because
the broker " +
+ "does not support version " + finalized + " of " +
feature.name() +
+ ". It wants a version between " +
feature.minSupportedVersion() + " and " +
+ feature.maxSupportedVersion() + ", inclusive.");
+ }
+ // A feature is not found in the finalizedFeature map if it is unknown
to the controller or set to 0 (feature not enabled).
+ // As more features roll out, it may be common to leave a feature
disabled, so this log is debug level in the case
+ // an intended feature is not being set.
+ if (finalized == 0)
+ log.debug("Broker {} registered with feature {} that is either
unknown or version 0 on the controller",
Review Comment:
We can validate the feature vs. Features.PRODUCTION_FEATURES, if it's not
present there, we can log a warning.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]