rondagostino commented on code in PR #14860:
URL: https://github.com/apache/kafka/pull/14860#discussion_r1409696104


##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -59,10 +59,14 @@ object StorageTool extends Logging {
         case "format" =>
           val directories = configToLogDirectories(config.get)
           val clusterId = namespace.getString("cluster_id")
-          val metadataVersion = getMetadataVersion(namespace, 
Option(config.get.interBrokerProtocolVersionString))
+          val metadataVersion = getMetadataVersion(namespace,
+            
Option(config.get.originals.get(KafkaConfig.InterBrokerProtocolVersionProp)).map(_.toString))
           if (!metadataVersion.isKRaftSupported) {
             throw new TerseFailure(s"Must specify a valid KRaft metadata 
version of at least 3.0.")
           }
+          if (!metadataVersion.isProduction()) {
+            throw new TerseFailure(s"Metadata version ${metadataVersion} is 
not ready for production use yet.")
+          }

Review Comment:
   Do we need an override flag/option, do you think?



##########
server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java:
##########
@@ -386,4 +418,21 @@ public void testOffsetCommitValueVersion(MetadataVersion 
metadataVersion) {
     public void 
testOffsetCommitValueVersionWithExpiredTimestamp(MetadataVersion 
metadataVersion) {
         assertEquals((short) 1, 
metadataVersion.offsetCommitValueVersion(true));
     }
+
+    @Test
+    public void assertLatestProductionIsLessThanLatest() {
+        assertTrue(LATEST_PRODUCTION.ordinal() < 
MetadataVersion.latest().ordinal(),
+            "Expected LATEST_PRODUCTION " + LATEST_PRODUCTION +
+            " to be less than the latest of " + MetadataVersion.latest());
+    }

Review Comment:
   Will this always be true?  It seems that if we want to make the latest one 
production-ready then we will have to add a new MetadataVersion after it that 
isn't.  Is this what we want?



##########
metadata/src/main/resources/common/metadata/PartitionRecord.json:
##########
@@ -42,13 +42,13 @@
       "about": "The epoch of the partition leader." },
     { "name": "PartitionEpoch", "type": "int32", "versions": "0+", "default": 
"-1",
       "about": "An epoch that gets incremented each time we change anything in 
the partition." },
+    { "name": "Directories", "type": "[]uuid", "versions": "1+",
+      "about": "The log directory hosting each replica, sorted in the same 
exact order as the Replicas field."},
     { "name": "EligibleLeaderReplicas", "type": "[]int32", "default": "null", 
"entityType": "brokerId",
-      "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", 
"tag": 1,
+      "versions": "2+", "nullableVersions": "1+", "taggedVersions": "2+", 
"tag": 1,

Review Comment:
   `s/nullableVersions": "1+",/nullableVersions": "2+",/` (unless this is done 
this way on purpose, in which case could you explain why to help me understand?)



##########
server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java:
##########
@@ -329,19 +336,44 @@ public void 
testIsDelegationTokenSupported(MetadataVersion metadataVersion) {
     @ParameterizedTest
     @EnumSource(value = MetadataVersion.class)
     public void testIsElrSupported(MetadataVersion metadataVersion) {
-        assertEquals(metadataVersion.equals(IBP_3_7_IV1),
-                metadataVersion.isElrSupported());
-        short expectPartitionRecordVersion = 
metadataVersion.equals(IBP_3_7_IV1) ? (short) 1 : (short) 0;
-        assertEquals(expectPartitionRecordVersion, 
metadataVersion.partitionRecordVersion());
-        short expectPartitionChangeRecordVersion = 
metadataVersion.equals(IBP_3_7_IV1) ? (short) 1 : (short) 0;
-        assertEquals(expectPartitionChangeRecordVersion, 
metadataVersion.partitionChangeRecordVersion());
+        assertEquals(metadataVersion.equals(IBP_3_7_IV3), 
metadataVersion.isElrSupported());

Review Comment:
   `s/metadataVersion.equals/metadataVersion.isAtLeast`
   
   We should probably add a method 
`testIsDirectoryAssignmentSupported(MetadataVersion metadataVersion)`



##########
metadata/src/main/resources/common/metadata/PartitionRecord.json:
##########
@@ -42,13 +42,13 @@
       "about": "The epoch of the partition leader." },
     { "name": "PartitionEpoch", "type": "int32", "versions": "0+", "default": 
"-1",
       "about": "An epoch that gets incremented each time we change anything in 
the partition." },
+    { "name": "Directories", "type": "[]uuid", "versions": "1+",
+      "about": "The log directory hosting each replica, sorted in the same 
exact order as the Replicas field."},
     { "name": "EligibleLeaderReplicas", "type": "[]int32", "default": "null", 
"entityType": "brokerId",
-      "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", 
"tag": 1,
+      "versions": "2+", "nullableVersions": "1+", "taggedVersions": "2+", 
"tag": 1,
       "about": "The eligible leader replicas of this partition." },
     { "name": "LastKnownELR", "type": "[]int32", "default": "null", 
"entityType": "brokerId",
-      "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", 
"tag": 2,
-      "about": "The last known eligible leader replicas of this partition." },
-    { "name": "Directories", "type": "[]uuid", "versions": "2+",
-      "about": "The log directory hosting each replica, sorted in the same 
exact order as the Replicas field."}
+      "versions": "2+", "nullableVersions": "1+", "taggedVersions": "2+", 
"tag": 2,

Review Comment:
   `s/nullableVersions": "1+",/nullableVersions": "2+",/` (same caveat as above)



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java:
##########
@@ -58,7 +58,7 @@ public static Map<String, VersionRange> defaultFeatureMap() {
         Map<String, VersionRange> features = new HashMap<>(1);
         features.put(MetadataVersion.FEATURE_NAME, VersionRange.of(
                 MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
-                MetadataVersion.latest().featureLevel()));
+                MetadataVersion.LATEST_PRODUCTION.featureLevel()));

Review Comment:
   Should add a test to lock this is.



##########
core/src/test/scala/unit/kafka/tools/StorageToolTest.scala:
##########
@@ -203,7 +203,7 @@ Found problem:
   def testDefaultMetadataVersion(): Unit = {
     val namespace = StorageTool.parseArguments(Array("format", "-c", 
"config.props", "-t", "XcZZOzUqS4yHOjhMQB6JLQ"))
     val mv = StorageTool.getMetadataVersion(namespace, defaultVersionString = 
None)
-    assertEquals(MetadataVersion.latest().featureLevel(), mv.featureLevel(),
+    assertEquals(MetadataVersion.LATEST_PRODUCTION.featureLevel(), 
mv.featureLevel(),
       "Expected the default metadata.version to be the latest version")

Review Comment:
   `s/to be the latest version/to be the latest production-ready version/`



##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -2191,7 +2191,7 @@ class PartitionTest extends AbstractPartitionTest {
     val partition = new Partition(
       topicPartition,
       replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
-      interBrokerProtocolVersion = MetadataVersion.IBP_3_7_IV1,
+      interBrokerProtocolVersion = MetadataVersion.IBP_3_7_IV2,

Review Comment:
   Curious why this changed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to