This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 343bc995f4a KAFKA-18920: The kcontrollers must set kraft.version in
ApiVersionsResponse (#19127)
343bc995f4a is described below
commit 343bc995f4a326e1bff0c6a2f1b566bd33b631ef
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Fri Mar 7 13:46:46 2025 -0800
KAFKA-18920: The kcontrollers must set kraft.version in ApiVersionsResponse
(#19127)
The kafka controllers need to set kraft.version in their
ApiVersionsResponse messages according to the current kraft.version
reported by the Raft layer. Instead, currently they always set it to 0.
Also remove FeatureControlManager.latestFinalizedFeatures. It is not
needed and it does a lot of copying.
Reviewers: Jun Rao <[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../main/scala/kafka/server/ControllerServer.scala | 6 +++--
.../kafka/server/KRaftClusterTest.scala | 29 ++++++++++++++++++++++
.../kafka/controller/FeatureControlManager.java | 11 +-------
.../kafka/server/common/FinalizedFeatures.java | 13 ++++++++++
4 files changed, 47 insertions(+), 12 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 397bcf17d9f..9e3439d61c3 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -42,7 +42,7 @@ import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.authorizer.Authorizer
import
org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG,
CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG}
-import org.apache.kafka.server.common.{ApiMessageAndVersion,
NodeToControllerChannelManager}
+import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion,
NodeToControllerChannelManager}
import org.apache.kafka.server.config.ConfigType
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics,
LinuxIoMetricsCollector}
import org.apache.kafka.server.network.{EndpointReadyFutures,
KafkaAuthorizerServerInfo}
@@ -153,7 +153,9 @@ class ControllerServer(
val apiVersionManager = new SimpleApiVersionManager(
ListenerType.CONTROLLER,
config.unstableApiVersionsEnabled,
- () => featuresPublisher.features()
+ () => featuresPublisher.features().setFinalizedLevel(
+ KRaftVersion.FEATURE_NAME,
+ raftManager.client.kraftVersion().featureLevel())
)
// metrics will be set to null when closing a controller, so we should
recreate it for testing
diff --git
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index b4515f7919d..cdc87155289 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -1004,6 +1004,35 @@ class KRaftClusterTest {
}
}
+ @ParameterizedTest
+ @ValueSource(booleans = Array(false, true))
+ def testDescribeKRaftVersion(usingBootstrapControlers: Boolean): Unit = {
+ val cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder().
+ setNumBrokerNodes(1).
+ setNumControllerNodes(1).
+ setFeature(KRaftVersion.FEATURE_NAME, 1.toShort).build()).build()
+ try {
+ cluster.format()
+ cluster.startup()
+ cluster.waitForReadyBrokers()
+ val admin = Admin.create(cluster.newClientPropertiesBuilder().
+ setUsingBootstrapControllers(usingBootstrapControlers).
+ build())
+ try {
+ val featureMetadata = admin.describeFeatures().featureMetadata().get()
+ assertEquals(new SupportedVersionRange(0, 1),
+ featureMetadata.supportedFeatures().get(KRaftVersion.FEATURE_NAME))
+ assertEquals(new FinalizedVersionRange(1.toShort, 1.toShort),
+ featureMetadata.finalizedFeatures().get(KRaftVersion.FEATURE_NAME))
+ } finally {
+ admin.close()
+ }
+ } finally {
+ cluster.close()
+ }
+ }
+
@Test
def testRemoteLogManagerInstantiation(): Unit = {
val cluster = new KafkaClusterTestKit.Builder(
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
index 40eb23ce639..5bbe3b9f148 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
@@ -359,15 +359,6 @@ public class FeatureControlManager {
return new FinalizedControllerFeatures(features, epoch);
}
- FinalizedControllerFeatures latestFinalizedFeatures() {
- Map<String, Short> features = new HashMap<>();
- features.put(MetadataVersion.FEATURE_NAME,
metadataVersion.get().featureLevel());
- for (Entry<String, Short> entry : finalizedVersions.entrySet()) {
- features.put(entry.getKey(), entry.getValue());
- }
- return new FinalizedControllerFeatures(features, -1);
- }
-
public void replay(FeatureLevelRecord record) {
VersionRange range =
quorumFeatures.localSupportedFeature(record.name());
if (!range.contains(record.featureLevel())) {
@@ -395,7 +386,7 @@ public class FeatureControlManager {
}
boolean isElrFeatureEnabled() {
- return
latestFinalizedFeatures().versionOrDefault(EligibleLeaderReplicasVersion.FEATURE_NAME,
(short) 0) >=
+ return
finalizedVersions.getOrDefault(EligibleLeaderReplicasVersion.FEATURE_NAME,
(short) 0) >=
EligibleLeaderReplicasVersion.ELRV_1.featureLevel();
}
}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
b/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
index 1bf77a7c14c..4e5025dc62f 100644
---
a/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
+++
b/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
@@ -40,4 +40,17 @@ public record FinalizedFeatures(
this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
this.finalizedFeatures.put(MetadataVersion.FEATURE_NAME,
metadataVersion.featureLevel());
}
+
+ public FinalizedFeatures setFinalizedLevel(String key, short level) {
+ if (level == (short) 0) {
+ return this;
+ } else {
+ Map<String, Short> newFinalizedFeatures = new
HashMap<>(finalizedFeatures);
+ newFinalizedFeatures.put(key, level);
+ return new FinalizedFeatures(
+ metadataVersion,
+ newFinalizedFeatures,
+ finalizedFeaturesEpoch);
+ }
+ }
}