This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 343bc995f4a KAFKA-18920: The kcontrollers must set kraft.version in 
ApiVersionsResponse (#19127)
343bc995f4a is described below

commit 343bc995f4a326e1bff0c6a2f1b566bd33b631ef
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Fri Mar 7 13:46:46 2025 -0800

    KAFKA-18920: The kcontrollers must set kraft.version in ApiVersionsResponse 
(#19127)
    
    The kafka controllers need to set kraft.version in their
    ApiVersionsResponse messages according to the current kraft.version
    reported by the Raft layer. Instead, currently they always set it to 0.
    
    Also remove FeatureControlManager.latestFinalizedFeatures. It is not
    needed and it does a lot of copying.
    
    Reviewers: Jun Rao <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../main/scala/kafka/server/ControllerServer.scala |  6 +++--
 .../kafka/server/KRaftClusterTest.scala            | 29 ++++++++++++++++++++++
 .../kafka/controller/FeatureControlManager.java    | 11 +-------
 .../kafka/server/common/FinalizedFeatures.java     | 13 ++++++++++
 4 files changed, 47 insertions(+), 12 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala 
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 397bcf17d9f..9e3439d61c3 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -42,7 +42,7 @@ import org.apache.kafka.raft.QuorumConfig
 import org.apache.kafka.security.CredentialProvider
 import org.apache.kafka.server.authorizer.Authorizer
 import 
org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG,
 CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG}
-import org.apache.kafka.server.common.{ApiMessageAndVersion, 
NodeToControllerChannelManager}
+import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion, 
NodeToControllerChannelManager}
 import org.apache.kafka.server.config.ConfigType
 import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, 
LinuxIoMetricsCollector}
 import org.apache.kafka.server.network.{EndpointReadyFutures, 
KafkaAuthorizerServerInfo}
@@ -153,7 +153,9 @@ class ControllerServer(
       val apiVersionManager = new SimpleApiVersionManager(
         ListenerType.CONTROLLER,
         config.unstableApiVersionsEnabled,
-        () => featuresPublisher.features()
+        () => featuresPublisher.features().setFinalizedLevel(
+          KRaftVersion.FEATURE_NAME,
+          raftManager.client.kraftVersion().featureLevel())
       )
 
       //  metrics will be set to null when closing a controller, so we should 
recreate it for testing
diff --git 
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala 
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index b4515f7919d..cdc87155289 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -1004,6 +1004,35 @@ class KRaftClusterTest {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = Array(false, true))
+  def testDescribeKRaftVersion(usingBootstrapControlers: Boolean): Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(1).
+        setNumControllerNodes(1).
+        setFeature(KRaftVersion.FEATURE_NAME, 1.toShort).build()).build()
+    try {
+      cluster.format()
+      cluster.startup()
+      cluster.waitForReadyBrokers()
+      val admin = Admin.create(cluster.newClientPropertiesBuilder().
+        setUsingBootstrapControllers(usingBootstrapControlers).
+        build())
+      try {
+        val featureMetadata = admin.describeFeatures().featureMetadata().get()
+        assertEquals(new SupportedVersionRange(0, 1),
+          featureMetadata.supportedFeatures().get(KRaftVersion.FEATURE_NAME))
+        assertEquals(new FinalizedVersionRange(1.toShort, 1.toShort),
+          featureMetadata.finalizedFeatures().get(KRaftVersion.FEATURE_NAME))
+      } finally {
+        admin.close()
+      }
+    } finally {
+      cluster.close()
+    }
+  }
+
   @Test
   def testRemoteLogManagerInstantiation(): Unit = {
     val cluster = new KafkaClusterTestKit.Builder(
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java 
b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
index 40eb23ce639..5bbe3b9f148 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
@@ -359,15 +359,6 @@ public class FeatureControlManager {
         return new FinalizedControllerFeatures(features, epoch);
     }
 
-    FinalizedControllerFeatures latestFinalizedFeatures() {
-        Map<String, Short> features = new HashMap<>();
-        features.put(MetadataVersion.FEATURE_NAME, 
metadataVersion.get().featureLevel());
-        for (Entry<String, Short> entry : finalizedVersions.entrySet()) {
-            features.put(entry.getKey(), entry.getValue());
-        }
-        return new FinalizedControllerFeatures(features, -1);
-    }
-
     public void replay(FeatureLevelRecord record) {
         VersionRange range = 
quorumFeatures.localSupportedFeature(record.name());
         if (!range.contains(record.featureLevel())) {
@@ -395,7 +386,7 @@ public class FeatureControlManager {
     }
 
     boolean isElrFeatureEnabled() {
-        return 
latestFinalizedFeatures().versionOrDefault(EligibleLeaderReplicasVersion.FEATURE_NAME,
 (short) 0) >=
+        return 
finalizedVersions.getOrDefault(EligibleLeaderReplicasVersion.FEATURE_NAME, 
(short) 0) >=
             EligibleLeaderReplicasVersion.ELRV_1.featureLevel();
     }
 }
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
index 1bf77a7c14c..4e5025dc62f 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
@@ -40,4 +40,17 @@ public record FinalizedFeatures(
         this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
         this.finalizedFeatures.put(MetadataVersion.FEATURE_NAME, 
metadataVersion.featureLevel());
     }
+
+    public FinalizedFeatures setFinalizedLevel(String key, short level) {
+        if (level == (short) 0) {
+            return this;
+        } else {
+            Map<String, Short> newFinalizedFeatures = new 
HashMap<>(finalizedFeatures);
+            newFinalizedFeatures.put(key, level);
+            return new FinalizedFeatures(
+                metadataVersion,
+                newFinalizedFeatures,
+                finalizedFeaturesEpoch);
+        }
+    }
 }

Reply via email to