This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new 21dd41b251a KAFKA-18920: The kcontrollers must set kraft.version in 
ApiVersionsResponse (#19127)
21dd41b251a is described below

commit 21dd41b251a1cc2faa0d3848ba0b973c1eba7eff
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Fri Mar 7 13:46:46 2025 -0800

    KAFKA-18920: The kcontrollers must set kraft.version in ApiVersionsResponse 
(#19127)
    
    The kafka controllers need to set kraft.version in their
    ApiVersionsResponse messages according to the current kraft.version
    reported by the Raft layer. Instead, currently they always set it to 0.
    
    Also remove FeatureControlManager.latestFinalizedFeatures. It is not
    needed and it does a lot of copying.
    
    Reviewers: Jun Rao <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../main/scala/kafka/server/ControllerServer.scala |  7 ++++--
 .../kafka/server/KRaftClusterTest.scala            | 28 ++++++++++++++++++++++
 .../kafka/server/common/FinalizedFeatures.java     | 17 +++++++++++++
 3 files changed, 50 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala 
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 0905215f9a2..11a0e2bbf86 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -44,9 +44,10 @@ import org.apache.kafka.raft.QuorumConfig
 import org.apache.kafka.security.{CredentialProvider, PasswordEncoder}
 import org.apache.kafka.server.NodeToControllerChannelManager
 import org.apache.kafka.server.authorizer.Authorizer
-import 
org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG,
 CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG}
 import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.kafka.server.common.KRaftVersion
 import org.apache.kafka.server.config.ConfigType
+import 
org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG,
 CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG}
 import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, 
LinuxIoMetricsCollector}
 import org.apache.kafka.server.network.{EndpointReadyFutures, 
KafkaAuthorizerServerInfo}
 import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
@@ -176,7 +177,9 @@ class ControllerServer(
         ListenerType.CONTROLLER,
         config.unstableApiVersionsEnabled,
         config.migrationEnabled,
-        () => featuresPublisher.features()
+        () => featuresPublisher.features().setFinalizedLevel(
+          KRaftVersion.FEATURE_NAME,
+          raftManager.client.kraftVersion().featureLevel())
       )
 
       tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
diff --git 
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala 
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 18402e54083..63d1b186e0b 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -1047,6 +1047,34 @@ class KRaftClusterTest {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = Array(false, true))
+  def testDescribeKRaftVersion(usingBootstrapControlers: Boolean): Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(1).
+        setNumControllerNodes(1).build()).build()
+    try {
+      cluster.format()
+      cluster.startup()
+      cluster.waitForReadyBrokers()
+      val admin = Admin.create(cluster.newClientPropertiesBuilder().
+        setUsingBootstrapControllers(usingBootstrapControlers).
+        build())
+      try {
+        val featureMetadata = admin.describeFeatures().featureMetadata().get()
+        assertEquals(new SupportedVersionRange(0, 1),
+          featureMetadata.supportedFeatures().get(KRaftVersion.FEATURE_NAME))
+        assertEquals(new FinalizedVersionRange(1.toShort, 1.toShort),
+          featureMetadata.finalizedFeatures().get(KRaftVersion.FEATURE_NAME))
+      } finally {
+        admin.close()
+      }
+    } finally {
+      cluster.close()
+    }
+  }
+
   @Test
   def testRemoteLogManagerInstantiation(): Unit = {
     val cluster = new KafkaClusterTestKit.Builder(
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
index de78a3a72a8..74fce1f81ff 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
@@ -25,6 +25,7 @@ public final class FinalizedFeatures {
     private final MetadataVersion metadataVersion;
     private final Map<String, Short> finalizedFeatures;
     private final long finalizedFeaturesEpoch;
+    private final boolean kraftMode;
 
     public static FinalizedFeatures fromKRaftVersion(MetadataVersion version) {
         return new FinalizedFeatures(version, Collections.emptyMap(), -1, 
true);
@@ -39,6 +40,7 @@ public final class FinalizedFeatures {
         this.metadataVersion = metadataVersion;
         this.finalizedFeatures = new HashMap<>(finalizedFeatures);
         this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
+        this.kraftMode = kraftMode;
         // In KRaft mode, we always include the metadata version in the 
features map.
         // In ZK mode, we never include it.
         if (kraftMode) {
@@ -82,4 +84,19 @@ public final class FinalizedFeatures {
                 ", finalizedFeaturesEpoch=" + finalizedFeaturesEpoch +
                 ")";
     }
+
+    public FinalizedFeatures setFinalizedLevel(String key, short level) {
+        if (level == (short) 0) {
+            return this;
+        } else {
+            Map<String, Short> newFinalizedFeatures = new 
HashMap<>(finalizedFeatures);
+            newFinalizedFeatures.put(key, level);
+            return new FinalizedFeatures(
+                metadataVersion,
+                newFinalizedFeatures,
+                finalizedFeaturesEpoch,
+                kraftMode
+            );
+        }
+    }
 }

Reply via email to