This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new 21dd41b251a KAFKA-18920: The kcontrollers must set kraft.version in
ApiVersionsResponse (#19127)
21dd41b251a is described below
commit 21dd41b251a1cc2faa0d3848ba0b973c1eba7eff
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Fri Mar 7 13:46:46 2025 -0800
KAFKA-18920: The kcontrollers must set kraft.version in ApiVersionsResponse
(#19127)
The kafka controllers need to set kraft.version in their
ApiVersionsResponse messages according to the current kraft.version
reported by the Raft layer. Instead, currently they always set it to 0.
Also remove FeatureControlManager.latestFinalizedFeatures. It is not
needed and it does a lot of copying.
Reviewers: Jun Rao <[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../main/scala/kafka/server/ControllerServer.scala | 7 ++++--
.../kafka/server/KRaftClusterTest.scala | 28 ++++++++++++++++++++++
.../kafka/server/common/FinalizedFeatures.java | 17 +++++++++++++
3 files changed, 50 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 0905215f9a2..11a0e2bbf86 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -44,9 +44,10 @@ import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.{CredentialProvider, PasswordEncoder}
import org.apache.kafka.server.NodeToControllerChannelManager
import org.apache.kafka.server.authorizer.Authorizer
-import
org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG,
CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG}
import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.kafka.server.common.KRaftVersion
import org.apache.kafka.server.config.ConfigType
+import
org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG,
CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG}
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics,
LinuxIoMetricsCollector}
import org.apache.kafka.server.network.{EndpointReadyFutures,
KafkaAuthorizerServerInfo}
import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
@@ -176,7 +177,9 @@ class ControllerServer(
ListenerType.CONTROLLER,
config.unstableApiVersionsEnabled,
config.migrationEnabled,
- () => featuresPublisher.features()
+ () => featuresPublisher.features().setFinalizedLevel(
+ KRaftVersion.FEATURE_NAME,
+ raftManager.client.kraftVersion().featureLevel())
)
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
diff --git
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 18402e54083..63d1b186e0b 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -1047,6 +1047,34 @@ class KRaftClusterTest {
}
}
+ @ParameterizedTest
+ @ValueSource(booleans = Array(false, true))
+ def testDescribeKRaftVersion(usingBootstrapControlers: Boolean): Unit = {
+ val cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder().
+ setNumBrokerNodes(1).
+ setNumControllerNodes(1).build()).build()
+ try {
+ cluster.format()
+ cluster.startup()
+ cluster.waitForReadyBrokers()
+ val admin = Admin.create(cluster.newClientPropertiesBuilder().
+ setUsingBootstrapControllers(usingBootstrapControlers).
+ build())
+ try {
+ val featureMetadata = admin.describeFeatures().featureMetadata().get()
+ assertEquals(new SupportedVersionRange(0, 1),
+ featureMetadata.supportedFeatures().get(KRaftVersion.FEATURE_NAME))
+ assertEquals(new FinalizedVersionRange(1.toShort, 1.toShort),
+ featureMetadata.finalizedFeatures().get(KRaftVersion.FEATURE_NAME))
+ } finally {
+ admin.close()
+ }
+ } finally {
+ cluster.close()
+ }
+ }
+
@Test
def testRemoteLogManagerInstantiation(): Unit = {
val cluster = new KafkaClusterTestKit.Builder(
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
b/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
index de78a3a72a8..74fce1f81ff 100644
---
a/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
+++
b/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
@@ -25,6 +25,7 @@ public final class FinalizedFeatures {
private final MetadataVersion metadataVersion;
private final Map<String, Short> finalizedFeatures;
private final long finalizedFeaturesEpoch;
+ private final boolean kraftMode;
public static FinalizedFeatures fromKRaftVersion(MetadataVersion version) {
return new FinalizedFeatures(version, Collections.emptyMap(), -1,
true);
@@ -39,6 +40,7 @@ public final class FinalizedFeatures {
this.metadataVersion = metadataVersion;
this.finalizedFeatures = new HashMap<>(finalizedFeatures);
this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
+ this.kraftMode = kraftMode;
// In KRaft mode, we always include the metadata version in the
features map.
// In ZK mode, we never include it.
if (kraftMode) {
@@ -82,4 +84,19 @@ public final class FinalizedFeatures {
", finalizedFeaturesEpoch=" + finalizedFeaturesEpoch +
")";
}
+
+ public FinalizedFeatures setFinalizedLevel(String key, short level) {
+ if (level == (short) 0) {
+ return this;
+ } else {
+ Map<String, Short> newFinalizedFeatures = new
HashMap<>(finalizedFeatures);
+ newFinalizedFeatures.put(key, level);
+ return new FinalizedFeatures(
+ metadataVersion,
+ newFinalizedFeatures,
+ finalizedFeaturesEpoch,
+ kraftMode
+ );
+ }
+ }
}