junrao commented on code in PR #18845:
URL: https://github.com/apache/kafka/pull/18845#discussion_r1953309570


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1650,13 +1642,7 @@ private QuorumController(
      */
     private void registerWriteNoOpRecord(long maxIdleIntervalNs) {
         periodicControl.registerTask(new PeriodicTask("writeNoOpRecord",
-            () -> {
-                ArrayList<ApiMessageAndVersion> records = new ArrayList<>(1);
-                if (featureControl.metadataVersion().isNoOpRecordSupported()) {
-                    records.add(new ApiMessageAndVersion(new NoOpRecord(), 
(short) 0));
-                }
-                return ControllerResult.of(records, false);
-            },
+            () -> ControllerResult.of(List.of(new ApiMessageAndVersion(new 
NoOpRecord(), (short) 0)), false),

Review Comment:
   The javadoc above on `if the MetadataVersion supports it.` needs to be 
changed too.



##########
metadata/src/test/java/org/apache/kafka/image/MetadataVersionChangeTest.java:
##########
@@ -30,31 +32,31 @@
 @Timeout(value = 40)
 public class MetadataVersionChangeTest {
 
-    private static final MetadataVersionChange CHANGE_3_0_IV1_TO_3_3_IV0 =
-        new MetadataVersionChange(IBP_3_0_IV1, IBP_3_3_IV0);
+    private static final MetadataVersionChange CHANGE_MINUMUM_TO_LATEST =
+        new MetadataVersionChange(MetadataVersion.MINIMUM_VERSION, 
MetadataVersion.latestProduction());
 
-    private static final MetadataVersionChange CHANGE_3_3_IV0_TO_3_0_IV1 =
-        new MetadataVersionChange(IBP_3_3_IV0, IBP_3_0_IV1);
+    private static final MetadataVersionChange CHANGE_3_6_IV0_TO_3_3_IV3 =

Review Comment:
   Could we change this to use MetadataVersion.MINIMUM_VERSION and 
MetadataVersion.latestProduction() too?



##########
metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java:
##########
@@ -58,7 +59,14 @@ public void finishSnapshot() {
 
     public void replay(FeatureLevelRecord record) {
         if (record.name().equals(MetadataVersion.FEATURE_NAME)) {
-            metadataVersionChange = 
MetadataVersion.fromFeatureLevel(record.featureLevel());
+            // Support for the `metadata.version` feature flag was added in 
IBP_3_3_IV0, so it's possible (but unlikely) that we read
+            // records with a feature level that is no longer supported for 
clusters that used a pre-release version of 3.3.0.
+            // We automatically fallback to IBP_3_3_IV3 in that case. We use 
explicit versions instead of `MINIMUM_VERSION` because
+            // we want to force an explicit decision if we change 
`MetadataVersion.MINIMUM_VERSION` in the future.
+            if (record.featureLevel() >= MINIMUM_PERSISTED_FEATURE_LEVEL && 
record.featureLevel() <= MetadataVersion.IBP_3_3_IV3.featureLevel())
+                metadataVersionChange = MetadataVersion.IBP_3_3_IV3;

Review Comment:
   Since we only support upgrading from officially released 3.3 to 4.0, we 
could probably just fail MV 3_3_IV0 to 3_3_IV2 as other unsupported MVs.



##########
server-common/src/main/java/org/apache/kafka/server/common/UnitTestFeatureVersion.java:
##########
@@ -321,7 +321,7 @@ public Map<String, Short> dependencies() {
      * The feature is used to test the default value has MV dependency that is 
behind the bootstrap MV.
      */
     public enum FV7 implements FeatureVersion {
-        UT_FV7_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION, 
Collections.singletonMap(MetadataVersion.FEATURE_NAME, 
MetadataVersion.IBP_3_7_IV0.featureLevel())),
+        UT_FV7_0(0, MetadataVersion.MINIMUM_VERSION, 
Collections.singletonMap(MetadataVersion.FEATURE_NAME, 
MetadataVersion.IBP_3_7_IV0.featureLevel())),

Review Comment:
   The test passes since the error message is only used if an expected 
exception is not thrown. 



##########
metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java:
##########
@@ -56,7 +53,7 @@ public FeaturesImage(
 
     public boolean isEmpty() {
         return finalizedVersions.isEmpty() &&
-            metadataVersion.equals(MetadataVersion.MINIMUM_KRAFT_VERSION);
+            metadataVersion.equals(MetadataVersion.MINIMUM_VERSION);

Review Comment:
   `isEmpty()` is only used in `MetadataLoaderTest`. So, we can address this in 
a followup jira. But I am not sure if `isEmpty()` will ever be false given that 
metadata version is required in the metadata log and probably the snapshot.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to