This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 4990c970c19 KAFKA-18979; Report correct kraft.version in ApiVersions
(#19205)
4990c970c19 is described below
commit 4990c970c1941eed6ce3f38d8991d4195d3777d3
Author: José Armando García Sancio <[email protected]>
AuthorDate: Thu Mar 13 18:39:24 2025 -0400
KAFKA-18979; Report correct kraft.version in ApiVersions (#19205)
Skip kraft.version when applying FeatureLevelRecord records. The
kraft.version is stored as control records and not as metadata records. This
solution has the benefits of removing from snapshots any FeatureLevelRecord for
kraft.version that was incorrectly written to the log and allows ApiVersions to
report the correct finalized kraft.version.
Reviewers: Colin P. McCabe <[email protected]>
---
.../kafka/controller/FeatureControlManager.java | 5 ++++
.../java/org/apache/kafka/image/FeaturesDelta.java | 6 +++-
.../controller/FeatureControlManagerTest.java | 24 +++++++++++++++
.../org/apache/kafka/image/FeaturesDeltaTest.java | 8 +++++
server-common/bin/test/log4j2.yaml | 35 ++++++++++++++++++++++
.../kafka/server/common/FinalizedFeatures.java | 11 ++++++-
.../kafka/server/common/FinalizedFeaturesTest.java | 18 +++++++++++
7 files changed, 105 insertions(+), 2 deletions(-)
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
index 5bbe3b9f148..50c069b9789 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
@@ -27,6 +27,7 @@ import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.Feature;
+import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.mutable.BoundedList;
import org.apache.kafka.timeline.SnapshotRegistry;
@@ -369,6 +370,10 @@ public class FeatureControlManager {
MetadataVersion mv =
MetadataVersion.fromFeatureLevel(record.featureLevel());
metadataVersion.set(mv);
log.info("Replayed a FeatureLevelRecord setting metadata.version
to {}", mv);
+ } else if (record.name().equals(KRaftVersion.FEATURE_NAME)) {
+ // KAFKA-18979 - Skip any feature level record for kraft.version.
This has two benefits:
+ // 1. It removes from snapshots any FeatureLevelRecord for
kraft.version that was incorrectly written to the log
+ // 2. Allows ApiVersions to report the correct finalized
kraft.version
} else {
if (record.featureLevel() == 0) {
finalizedVersions.remove(record.name());
diff --git a/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java
b/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java
index 3a3cbefe448..8812bd2bbe3 100644
--- a/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java
@@ -18,6 +18,7 @@
package org.apache.kafka.image;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion;
import java.util.HashMap;
@@ -30,7 +31,6 @@ import java.util.Optional;
* Represents changes to the cluster in the metadata image.
*/
public final class FeaturesDelta {
- private static final short MINIMUM_PERSISTED_FEATURE_LEVEL = 4;
private final FeaturesImage image;
private final Map<String, Optional<Short>> changes = new HashMap<>();
@@ -66,6 +66,10 @@ public final class FeaturesDelta {
+ "please ensure the metadata version is set to " +
MetadataVersion.MINIMUM_VERSION + " (or higher) before "
+ "updating the software version. The metadata version
can be updated via the `kafka-features` command-line tool.", e);
}
+ } else if (record.name().equals(KRaftVersion.FEATURE_NAME)) {
+ // KAFKA-18979 - Skip any feature level record for kraft.version.
This has two benefits:
+ // 1. It removes from snapshots any FeatureLevelRecord for
kraft.version that was incorrectly written to the log
+ // 2. Allows ApiVersions to report the correct finalized
kraft.version
} else {
if (record.featureLevel() == 0) {
changes.put(record.name(), Optional.empty());
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
index 2d0bc4c860e..5d814879922 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.Feature;
+import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.MetadataVersionTestUtils;
import org.apache.kafka.server.common.TestFeatureVersion;
@@ -152,6 +153,29 @@ public class FeatureControlManagerTest {
manager.finalizedFeatures(123));
}
+ @Test
+ public void testReplayKraftVersionFeatureLevel() {
+ LogContext logContext = new LogContext();
+ SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
+
+ snapshotRegistry.idempotentCreateSnapshot(-1);
+ FeatureControlManager manager = new FeatureControlManager.Builder().
+ setLogContext(logContext).
+ setQuorumFeatures(features("foo", 1, 2)).
+ setSnapshotRegistry(snapshotRegistry).
+ build();
+ manager.replay(new
FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel()));
+ // Replay a kraft.version feature level record and shot that the level
doesn't get updated
+ manager.replay(new
FeatureLevelRecord().setName(KRaftVersion.FEATURE_NAME).setFeatureLevel(KRaftVersion.LATEST_PRODUCTION.featureLevel()));
+ snapshotRegistry.idempotentCreateSnapshot(123);
+ assertEquals(
+ new FinalizedControllerFeatures(
+ versionMap("metadata.version",
MetadataVersion.MINIMUM_VERSION.featureLevel()), 123
+ ),
+ manager.finalizedFeatures(123)
+ );
+ }
+
static ClusterFeatureSupportDescriber
createFakeClusterFeatureSupportDescriber(
List<Map.Entry<Integer, Map<String, VersionRange>>> brokerRanges,
List<Map.Entry<Integer, Map<String, VersionRange>>> controllerRanges
diff --git
a/metadata/src/test/java/org/apache/kafka/image/FeaturesDeltaTest.java
b/metadata/src/test/java/org/apache/kafka/image/FeaturesDeltaTest.java
index 7eb1e5d4fc6..ff5deba81ca 100644
--- a/metadata/src/test/java/org/apache/kafka/image/FeaturesDeltaTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/FeaturesDeltaTest.java
@@ -18,12 +18,14 @@
package org.apache.kafka.image;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.MetadataVersionTestUtils;
import org.junit.jupiter.api.Test;
import static java.util.Collections.emptyMap;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -40,4 +42,10 @@ class FeaturesDeltaTest {
"Expected substring missing from exception message: " +
exception.getMessage());
}
+ @Test
+ public void testReplayKraftVersionFeatureLevel() {
+ var featuresDelta = new FeaturesDelta(new FeaturesImage(emptyMap(),
MetadataVersion.MINIMUM_VERSION));
+ featuresDelta.replay(new
FeatureLevelRecord().setName(KRaftVersion.FEATURE_NAME).setFeatureLevel(KRaftVersion.LATEST_PRODUCTION.featureLevel()));
+ assertEquals(emptyMap(), featuresDelta.changes());
+ }
}
diff --git a/server-common/bin/test/log4j2.yaml
b/server-common/bin/test/log4j2.yaml
new file mode 100644
index 00000000000..be546a18b55
--- /dev/null
+++ b/server-common/bin/test/log4j2.yaml
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+Configuration:
+ Properties:
+ Property:
+ - name: "logPattern"
+ value: "[%d] %p %m (%c:%L)%n"
+
+ Appenders:
+ Console:
+ name: STDOUT
+ PatternLayout:
+ pattern: "${logPattern}"
+
+ Loggers:
+ Root:
+ level: INFO
+ AppenderRef:
+ - ref: STDOUT
+ Logger:
+ - name: org.apache.kafka
+ level: INFO
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
b/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
index 923632f6d1e..55101cc986d 100644
---
a/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
+++
b/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
@@ -96,7 +96,16 @@ public final class FinalizedFeatures {
public FinalizedFeatures setFinalizedLevel(String key, short level) {
if (level == (short) 0) {
- return this;
+ if (finalizedFeatures.containsKey(key)) {
+ Map<String, Short> newFinalizedFeatures = new
HashMap<>(finalizedFeatures);
+ newFinalizedFeatures.remove(key);
+ return new FinalizedFeatures(
+ metadataVersion,
+ newFinalizedFeatures,
+ finalizedFeaturesEpoch);
+ } else {
+ return this;
+ }
} else {
Map<String, Short> newFinalizedFeatures = new
HashMap<>(finalizedFeatures);
newFinalizedFeatures.put(key, level);
diff --git
a/server-common/src/test/java/org/apache/kafka/server/common/FinalizedFeaturesTest.java
b/server-common/src/test/java/org/apache/kafka/server/common/FinalizedFeaturesTest.java
index 9b889250029..4e3d6b0b139 100644
---
a/server-common/src/test/java/org/apache/kafka/server/common/FinalizedFeaturesTest.java
+++
b/server-common/src/test/java/org/apache/kafka/server/common/FinalizedFeaturesTest.java
@@ -47,4 +47,22 @@ class FinalizedFeaturesTest {
finalizedFeatures.finalizedFeatures().get("foo"));
assertEquals(1, finalizedFeatures.finalizedFeatures().size());
}
+
+ @Test
+ public void testSetFinalizedLevel() {
+ FinalizedFeatures finalizedFeatures = new FinalizedFeatures(
+ MINIMUM_VERSION,
+ Collections.singletonMap("foo", (short) 2),
+ 123,
+ true
+ );
+
+ // Override an existing finalized feature version to 0
+ FinalizedFeatures removedFeatures =
finalizedFeatures.setFinalizedLevel("foo", (short) 0);
+ assertNull(removedFeatures.finalizedFeatures().get("foo"));
+
+ // Override a missing finalized feature version to 0
+ FinalizedFeatures sameFeatures =
removedFeatures.setFinalizedLevel("foo", (short) 0);
+ assertEquals(sameFeatures.finalizedFeatures(),
removedFeatures.finalizedFeatures());
+ }
}