cmccabe commented on code in PR #12050: URL: https://github.com/apache/kafka/pull/12050#discussion_r871856970
########## metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java: ########## @@ -17,32 +17,73 @@ package org.apache.kafka.controller; +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.common.Node; import org.apache.kafka.metadata.VersionRange; +import org.apache.kafka.server.common.MetadataVersion; +import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; +import java.util.stream.Collectors; /** - * A holder class of the local node's supported feature flags. + * A holder class of the local node's supported feature flags as well as the ApiVersions of other nodes. */ public class QuorumFeatures { private final int nodeId; + private final ApiVersions apiVersions; private final Map<String, VersionRange> supportedFeatures; + private final List<Integer> quorumNodeIds; - QuorumFeatures(int nodeId, - Map<String, VersionRange> supportedFeatures) { + public QuorumFeatures(int nodeId, + ApiVersions apiVersions, + Map<String, VersionRange> supportedFeatures, + List<Integer> quorumNodeIds) { this.nodeId = nodeId; + this.apiVersions = apiVersions; this.supportedFeatures = Collections.unmodifiableMap(supportedFeatures); + this.quorumNodeIds = Collections.unmodifiableList(quorumNodeIds); } public static QuorumFeatures create(int nodeId, - Map<String, VersionRange> supportedFeatures) { - return new QuorumFeatures(nodeId, supportedFeatures); + ApiVersions apiVersions, + Map<String, VersionRange> supportedFeatures, + Collection<Node> quorumNodes) { + List<Integer> nodeIds = quorumNodes.stream().map(Node::id).collect(Collectors.toList()); + return new QuorumFeatures(nodeId, apiVersions, supportedFeatures, nodeIds); } public static Map<String, VersionRange> defaultFeatureMap() { - return Collections.emptyMap(); + Map<String, VersionRange> features = new HashMap<>(1); + features.put(MetadataVersion.FEATURE_NAME, VersionRange.of(MetadataVersion.IBP_3_0_IV0.featureLevel(), MetadataVersion.latest().featureLevel())); + return features; + } + + Optional<VersionRange> quorumSupportedFeature(String featureName) { Review Comment: I could be wrong about this, but I think this code might be a lot simpler without streams. I think a "for" loop would make it a lot clearer what was going on Another thing, I guess, is that null should map to 0 (if you don't have an entry in the map for a feature, you "have" that feature at level 0). So then the min is 0 (which means you can't enable it, etc.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org