dengziming commented on code in PR #12050:
URL: https://github.com/apache/kafka/pull/12050#discussion_r870922245


##########
core/src/main/scala/kafka/server/KafkaRaftServer.scala:
##########
@@ -102,6 +106,8 @@ class KafkaRaftServer(
       threadNamePrefix,
       controllerQuorumVotersFuture,
       KafkaRaftServer.configSchema,
+      raftApiVersions,
+      Some(bootstrapMetadata)

Review Comment:
   Yeah, that makes sense. But I want to clarify one thing here:
   > we should have the FeatureLevelRecord written out before any broker 
registrations occur
   
   The broker will register itself regardless of whether FeatureLevelRecord is 
written, but only set wantfence=false in the HeartbeatReq after 
FeatureLevelRecord has been published.



##########
metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java:
##########
@@ -51,7 +58,11 @@ public void finishSnapshot() {
     }
 
     public void replay(FeatureLevelRecord record) {
-        changes.put(record.name(), Optional.of(record.featureLevel()));
+        if (record.name().equals(MetadataVersion.FEATURE_NAME)) {
+            metadataVersionChange = record.featureLevel();
+        } else {
+            changes.put(record.name(), Optional.of(record.featureLevel()));
+        }
     }
 
     public void replay(RemoveFeatureLevelRecord record) {

Review Comment:
   Even though we didn't have `RemoveFeatureLevelRecord` currently, should we 
also set metadataVersionChange = Empty?



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java:
##########
@@ -17,34 +17,76 @@
 
 package org.apache.kafka.controller;
 
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.common.Node;
 import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.server.common.MetadataVersion;
 
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 /**
- * A holder class of the local node's supported feature flags.
+ * A holder class of the local node's supported feature flags as well as the 
ApiVersions of other nodes.
  */
 public class QuorumFeatures {
     private final int nodeId;
+    private final ApiVersions apiVersions;
     private final Map<String, VersionRange> supportedFeatures;
+    private final List<Integer> quorumNodeIds;
 
-    QuorumFeatures(int nodeId,
-                          Map<String, VersionRange> supportedFeatures) {
+    public QuorumFeatures(int nodeId,
+                          ApiVersions apiVersions,
+                          Map<String, VersionRange> supportedFeatures,
+                          List<Integer> quorumNodeIds) {
         this.nodeId = nodeId;
+        this.apiVersions = apiVersions;
         this.supportedFeatures = 
Collections.unmodifiableMap(supportedFeatures);
+        this.quorumNodeIds = Collections.unmodifiableList(quorumNodeIds);
     }
 
     public static QuorumFeatures create(int nodeId,
-                                        Map<String, VersionRange> 
supportedFeatures) {
-        return new QuorumFeatures(nodeId, supportedFeatures);
+                                        ApiVersions apiVersions,
+                                        Map<String, VersionRange> 
supportedFeatures,
+                                        Collection<Node> quorumNodes) {
+        List<Integer> nodeIds = 
quorumNodes.stream().map(Node::id).collect(Collectors.toList());
+        return new QuorumFeatures(nodeId, apiVersions, supportedFeatures, 
nodeIds);
     }
 
     public static Map<String, VersionRange> defaultFeatureMap() {
-        return Collections.emptyMap();
+        Map<String, VersionRange> features = new HashMap<>(1);
+        features.put(MetadataVersion.FEATURE_NAME, 
VersionRange.of(MetadataVersion.IBP_3_0_IV0.featureLevel(), 
MetadataVersion.latest().featureLevel()));
+        return features;
     }
 
+    Optional<VersionRange> quorumSupportedFeature(String featureName) {
+        List<VersionRange> supportedVersions = quorumNodeIds.stream()
+            .filter(node -> node != nodeId)
+            .map(node -> apiVersions.get(Integer.toString(node)))
+            .filter(Objects::nonNull)
+            .map(apiVersion -> apiVersion.supportedFeatures().get(featureName))
+            .filter(Objects::nonNull)
+            .map(supportedVersionRange -> 
VersionRange.of(supportedVersionRange.min(), supportedVersionRange.max()))
+            .collect(Collectors.toList());
+
+        localSupportedFeature(featureName).ifPresent(supportedVersions::add);
+
+        if (supportedVersions.isEmpty()) {
+            return Optional.empty();
+        } else {
+            return Optional.of(VersionRange.of(
+                (short) 
supportedVersions.stream().mapToInt(VersionRange::min).max().getAsInt(),
+                (short) 
supportedVersions.stream().mapToInt(VersionRange::max).min().getAsInt()
+            ));
+        }
+    }
+

Review Comment:
   nit: redudant inline



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to