mumrah commented on code in PR #12050:
URL: https://github.com/apache/kafka/pull/12050#discussion_r875213585


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java:
##########
@@ -17,32 +17,94 @@
 
 package org.apache.kafka.controller;
 
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.feature.SupportedVersionRange;
 import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.server.common.MetadataVersion;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.stream.Collectors;
 
 /**
- * A holder class of the local node's supported feature flags.
+ * A holder class of the local node's supported feature flags as well as the 
ApiVersions of other nodes.
  */
 public class QuorumFeatures {
     private final int nodeId;
+    private final ApiVersions apiVersions;
     private final Map<String, VersionRange> supportedFeatures;
+    private final List<Integer> quorumNodeIds;
 
-    QuorumFeatures(int nodeId,
-                          Map<String, VersionRange> supportedFeatures) {
+    QuorumFeatures(
+        int nodeId,
+        ApiVersions apiVersions,
+        Map<String, VersionRange> supportedFeatures,
+        List<Integer> quorumNodeIds
+    ) {
         this.nodeId = nodeId;
+        this.apiVersions = apiVersions;
         this.supportedFeatures = 
Collections.unmodifiableMap(supportedFeatures);
+        this.quorumNodeIds = Collections.unmodifiableList(quorumNodeIds);
     }
 
-    public static QuorumFeatures create(int nodeId,
-                                        Map<String, VersionRange> 
supportedFeatures) {
-        return new QuorumFeatures(nodeId, supportedFeatures);
+    public static QuorumFeatures create(
+        int nodeId,
+        ApiVersions apiVersions,
+        Map<String, VersionRange> supportedFeatures,
+        Collection<Node> quorumNodes
+    ) {
+        List<Integer> nodeIds = 
quorumNodes.stream().map(Node::id).collect(Collectors.toList());
+        return new QuorumFeatures(nodeId, apiVersions, supportedFeatures, 
nodeIds);
     }
 
     public static Map<String, VersionRange> defaultFeatureMap() {
-        return Collections.emptyMap();
+        Map<String, VersionRange> features = new HashMap<>(1);
+        features.put(MetadataVersion.FEATURE_NAME, 
VersionRange.of(MetadataVersion.IBP_3_0_IV0.featureLevel(), 
MetadataVersion.latest().featureLevel()));
+        return features;
+    }
+
+    Optional<VersionRange> quorumSupportedFeature(String featureName) {
+        List<VersionRange> supportedVersions = new 
ArrayList<>(quorumNodeIds.size());
+        for (int nodeId : quorumNodeIds) {
+            if (nodeId == this.nodeId) {

Review Comment:
   Adding a log message seems reasonable here.
   
   We _could_ add an eager check on the number of peer NodeApiVersions that 
we're checking, but I don't think we really need to. We add nodes to 
ApiVersions before any fetches can occur, and we only remove nodes from 
ApiVersions when processing a network disconnect. Like you said, if we have 
less than half of the peer's NodeApiVersions, then we wouldn't have a quorum. 
It also kind of seems awkward to have some quorum semantics in validation logic 
like this (i.e., `if W < N/2`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to