This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d04efca493b KAFKA-18979; Report correct kraft.version in ApiVersions
(#19205)
d04efca493b is described below
commit d04efca493b4a05e86570b05696206404695f1f7
Author: José Armando García Sancio <[email protected]>
AuthorDate: Thu Mar 13 18:39:24 2025 -0400
KAFKA-18979; Report correct kraft.version in ApiVersions (#19205)
Skip kraft.version when applying FeatureLevelRecord records. The
kraft.version is stored as control records and not as metadata records. This
solution has the benefits of removing from snapshots any FeatureLevelRecord for
kraft.version that was incorrectly written to the log and allows ApiVersions to
report the correct finalized kraft.version.
Reviewers: Colin P. McCabe <[email protected]>
---
.../kafka/controller/FeatureControlManager.java | 5 +++++
.../java/org/apache/kafka/image/FeaturesDelta.java | 6 +++++-
.../controller/FeatureControlManagerTest.java | 24 ++++++++++++++++++++++
.../org/apache/kafka/image/FeaturesDeltaTest.java | 8 ++++++++
.../kafka/server/common/FinalizedFeatures.java | 11 +++++++++-
.../kafka/server/common/FinalizedFeaturesTest.java | 18 ++++++++++++++++
6 files changed, 70 insertions(+), 2 deletions(-)
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
index c9ea02a99e4..92c70f44472 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
@@ -27,6 +27,7 @@ import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.Feature;
+import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.mutable.BoundedList;
import org.apache.kafka.timeline.SnapshotRegistry;
@@ -370,6 +371,10 @@ public class FeatureControlManager {
MetadataVersion mv =
MetadataVersion.fromFeatureLevel(record.featureLevel());
metadataVersion.set(Optional.of(mv));
log.info("Replayed a FeatureLevelRecord setting metadata.version
to {}", mv);
+ } else if (record.name().equals(KRaftVersion.FEATURE_NAME)) {
+ // KAFKA-18979 - Skip any feature level record for kraft.version.
This has two benefits:
+ // 1. It removes from snapshots any FeatureLevelRecord for
kraft.version that was incorrectly written to the log
+ // 2. Allows ApiVersions to report the correct finalized
kraft.version
} else {
if (record.featureLevel() == 0) {
finalizedVersions.remove(record.name());
diff --git a/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java
b/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java
index 3a3cbefe448..8812bd2bbe3 100644
--- a/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java
@@ -18,6 +18,7 @@
package org.apache.kafka.image;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion;
import java.util.HashMap;
@@ -30,7 +31,6 @@ import java.util.Optional;
* Represents changes to the cluster in the metadata image.
*/
public final class FeaturesDelta {
- private static final short MINIMUM_PERSISTED_FEATURE_LEVEL = 4;
private final FeaturesImage image;
private final Map<String, Optional<Short>> changes = new HashMap<>();
@@ -66,6 +66,10 @@ public final class FeaturesDelta {
+ "please ensure the metadata version is set to " +
MetadataVersion.MINIMUM_VERSION + " (or higher) before "
+ "updating the software version. The metadata version
can be updated via the `kafka-features` command-line tool.", e);
}
+ } else if (record.name().equals(KRaftVersion.FEATURE_NAME)) {
+ // KAFKA-18979 - Skip any feature level record for kraft.version.
This has two benefits:
+ // 1. It removes from snapshots any FeatureLevelRecord for
kraft.version that was incorrectly written to the log
+ // 2. Allows ApiVersions to report the correct finalized
kraft.version
} else {
if (record.featureLevel() == 0) {
changes.put(record.name(), Optional.empty());
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
index ee2f464ca5c..52e06a0e505 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.Feature;
+import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.MetadataVersionTestUtils;
import org.apache.kafka.server.common.TestFeatureVersion;
@@ -153,6 +154,29 @@ public class FeatureControlManagerTest {
manager.finalizedFeatures(123));
}
+ @Test
+ public void testReplayKraftVersionFeatureLevel() {
+ LogContext logContext = new LogContext();
+ SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
+
+ snapshotRegistry.idempotentCreateSnapshot(-1);
+ FeatureControlManager manager = new FeatureControlManager.Builder().
+ setLogContext(logContext).
+ setQuorumFeatures(features("foo", 1, 2)).
+ setSnapshotRegistry(snapshotRegistry).
+ build();
+ manager.replay(new
FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel()));
+ // Replay a kraft.version feature level record and shot that the level
doesn't get updated
+ manager.replay(new
FeatureLevelRecord().setName(KRaftVersion.FEATURE_NAME).setFeatureLevel(KRaftVersion.LATEST_PRODUCTION.featureLevel()));
+ snapshotRegistry.idempotentCreateSnapshot(123);
+ assertEquals(
+ new FinalizedControllerFeatures(
+ versionMap("metadata.version",
MetadataVersion.MINIMUM_VERSION.featureLevel()), 123
+ ),
+ manager.finalizedFeatures(123)
+ );
+ }
+
static ClusterFeatureSupportDescriber
createFakeClusterFeatureSupportDescriber(
List<Map.Entry<Integer, Map<String, VersionRange>>> brokerRanges,
List<Map.Entry<Integer, Map<String, VersionRange>>> controllerRanges
diff --git
a/metadata/src/test/java/org/apache/kafka/image/FeaturesDeltaTest.java
b/metadata/src/test/java/org/apache/kafka/image/FeaturesDeltaTest.java
index 7eb1e5d4fc6..ff5deba81ca 100644
--- a/metadata/src/test/java/org/apache/kafka/image/FeaturesDeltaTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/FeaturesDeltaTest.java
@@ -18,12 +18,14 @@
package org.apache.kafka.image;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.MetadataVersionTestUtils;
import org.junit.jupiter.api.Test;
import static java.util.Collections.emptyMap;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -40,4 +42,10 @@ class FeaturesDeltaTest {
"Expected substring missing from exception message: " +
exception.getMessage());
}
+ @Test
+ public void testReplayKraftVersionFeatureLevel() {
+ var featuresDelta = new FeaturesDelta(new FeaturesImage(emptyMap(),
MetadataVersion.MINIMUM_VERSION));
+ featuresDelta.replay(new
FeatureLevelRecord().setName(KRaftVersion.FEATURE_NAME).setFeatureLevel(KRaftVersion.LATEST_PRODUCTION.featureLevel()));
+ assertEquals(emptyMap(), featuresDelta.changes());
+ }
}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
b/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
index 4e5025dc62f..29634c7a3bd 100644
---
a/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
+++
b/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
@@ -43,7 +43,16 @@ public record FinalizedFeatures(
public FinalizedFeatures setFinalizedLevel(String key, short level) {
if (level == (short) 0) {
- return this;
+ if (finalizedFeatures.containsKey(key)) {
+ Map<String, Short> newFinalizedFeatures = new
HashMap<>(finalizedFeatures);
+ newFinalizedFeatures.remove(key);
+ return new FinalizedFeatures(
+ metadataVersion,
+ newFinalizedFeatures,
+ finalizedFeaturesEpoch);
+ } else {
+ return this;
+ }
} else {
Map<String, Short> newFinalizedFeatures = new
HashMap<>(finalizedFeatures);
newFinalizedFeatures.put(key, level);
diff --git
a/server-common/src/test/java/org/apache/kafka/server/common/FinalizedFeaturesTest.java
b/server-common/src/test/java/org/apache/kafka/server/common/FinalizedFeaturesTest.java
index e0021155135..9a82cc33d03 100644
---
a/server-common/src/test/java/org/apache/kafka/server/common/FinalizedFeaturesTest.java
+++
b/server-common/src/test/java/org/apache/kafka/server/common/FinalizedFeaturesTest.java
@@ -24,6 +24,7 @@ import java.util.Collections;
import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
class FinalizedFeaturesTest {
@Test
@@ -36,4 +37,21 @@ class FinalizedFeaturesTest {
finalizedFeatures.finalizedFeatures().get("foo"));
assertEquals(2, finalizedFeatures.finalizedFeatures().size());
}
+
+ @Test
+ public void testSetFinalizedLevel() {
+ FinalizedFeatures finalizedFeatures = new FinalizedFeatures(
+ MINIMUM_VERSION,
+ Collections.singletonMap("foo", (short) 2),
+ 123
+ );
+
+ // Override an existing finalized feature version to 0
+ FinalizedFeatures removedFeatures =
finalizedFeatures.setFinalizedLevel("foo", (short) 0);
+ assertNull(removedFeatures.finalizedFeatures().get("foo"));
+
+ // Override a missing finalized feature version to 0
+ FinalizedFeatures sameFeatures =
removedFeatures.setFinalizedLevel("foo", (short) 0);
+ assertEquals(sameFeatures.finalizedFeatures(),
removedFeatures.finalizedFeatures());
+ }
}