This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ebaa108967f KAFKA-16968: Introduce 3.8-IV0, 3.9-IV0, 3.9-IV1
ebaa108967f is described below

commit ebaa108967fca35b2fce1d9052e3641046e510fd
Author: Colin Patrick McCabe <cmcc...@apache.org>
AuthorDate: Thu Jun 27 14:03:03 2024 -0700

    KAFKA-16968: Introduce 3.8-IV0, 3.9-IV0, 3.9-IV1
    
    Create 3 new metadata versions:
    
    - 3.8-IV0, for the upcoming 3.8 release.
    - 3.9-IV0, to add support for KIP-1005.
    - 3.9-IV1, as the new release vehicle for KIP-966.
    
    Create ListOffsetRequest v9, which will be used in 3.9-IV0 to support 
KIP-1005. v9 is currently an unstable API version.
    
    Reviewers: Jun Rao <jun...@gmail.com>, Justine Olshan <jols...@confluent.io>
---
 .../common/message/ListOffsetsRequest.json         |  5 ++++-
 .../common/message/ListOffsetsResponse.json        |  4 +++-
 .../java/kafka/test/ClusterTestExtensionsTest.java |  2 +-
 .../kafka/zk/ZkMigrationIntegrationTest.scala      |  8 +++++---
 .../unit/kafka/server/ApiVersionsRequestTest.scala | 17 ++++++++-------
 .../unit/kafka/server/ListOffsetsRequestTest.scala | 11 +++++++++-
 .../unit/kafka/server/ReplicationQuotasTest.scala  |  2 +-
 .../kafka/server/SaslApiVersionsRequestTest.scala  |  8 ++++++--
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  1 +
 .../controller/PartitionChangeBuilderTest.java     | 14 ++++++-------
 .../kafka/controller/QuorumControllerTest.java     | 16 +++++++--------
 .../kafka/metadata/PartitionRegistrationTest.java  |  4 ++--
 .../kafka/server/common/MetadataVersion.java       | 24 +++++++++++++++++-----
 .../kafka/server/common/TestFeatureVersion.java    |  4 ++--
 .../apache/kafka/server/common/FeaturesTest.java   |  2 +-
 .../kafka/server/common/MetadataVersionTest.java   | 11 +++++++++-
 .../storage/utils/TieredStorageTestUtils.java      |  3 +++
 .../org/apache/kafka/tools/FeatureCommandTest.java |  7 ++++---
 18 files changed, 97 insertions(+), 46 deletions(-)

diff --git a/clients/src/main/resources/common/message/ListOffsetsRequest.json 
b/clients/src/main/resources/common/message/ListOffsetsRequest.json
index 5591c5f760a..2b30c974c99 100644
--- a/clients/src/main/resources/common/message/ListOffsetsRequest.json
+++ b/clients/src/main/resources/common/message/ListOffsetsRequest.json
@@ -34,9 +34,12 @@
   // Version 7 enables listing offsets by max timestamp (KIP-734).
   //
   // Version 8 enables listing offsets by local log start offset (KIP-405).
-  "validVersions": "0-8",
+  //
+  // Version 9 enables listing offsets by last tiered offset (KIP-1005).
+  "validVersions": "0-9",
   "deprecatedVersions": "0",
   "flexibleVersions": "6+",
+  "latestVersionUnstable": true,
   "fields": [
     { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
       "about": "The broker ID of the requester, or -1 if this request is being 
made by a normal consumer." },
diff --git a/clients/src/main/resources/common/message/ListOffsetsResponse.json 
b/clients/src/main/resources/common/message/ListOffsetsResponse.json
index 76177a60a9d..05e164d9128 100644
--- a/clients/src/main/resources/common/message/ListOffsetsResponse.json
+++ b/clients/src/main/resources/common/message/ListOffsetsResponse.json
@@ -34,7 +34,9 @@
   //
   // Version 8 enables listing offsets by local log start offset.
   // This is the earliest log start offset in the local log. (KIP-405).
-  "validVersions": "0-8",
+  //
+  // Version 9 enables listing offsets by last tiered offset (KIP-1005).
+  "validVersions": "0-9",
   "flexibleVersions": "6+",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", 
"ignorable": true,
diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java 
b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
index c698632094e..498936196ed 100644
--- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
+++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
@@ -187,7 +187,7 @@ public class ClusterTestExtensionsTest {
 
     @ClusterTest
     public void testDefaults(ClusterInstance clusterInstance) {
-        Assertions.assertEquals(MetadataVersion.IBP_4_0_IV0, 
clusterInstance.config().metadataVersion());
+        Assertions.assertEquals(MetadataVersion.latestTesting(), 
clusterInstance.config().metadataVersion());
     }
 
     @ClusterTests({
diff --git 
a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
index e98a8fdeccb..ec9aad30bbc 100644
--- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
@@ -73,7 +73,9 @@ object ZkMigrationIntegrationTest {
       MetadataVersion.IBP_3_7_IV2,
       MetadataVersion.IBP_3_7_IV4,
       MetadataVersion.IBP_3_8_IV0,
-      MetadataVersion.IBP_4_0_IV0
+      MetadataVersion.IBP_3_9_IV0,
+      MetadataVersion.IBP_3_9_IV1
+      // Note: ZK Migration is not supported in Apache Kafka 4.0 and beyond.
     ).map { mv =>
       val serverProperties = new util.HashMap[String, String]()
       serverProperties.put("inter.broker.listener.name", "EXTERNAL")
@@ -492,7 +494,7 @@ class ZkMigrationIntegrationTest {
     }
   }
 
-  @ClusterTest(types = Array(Type.ZK), brokers = 3, metadataVersion = 
MetadataVersion.IBP_3_8_IV0, serverProperties = Array(
+  @ClusterTest(types = Array(Type.ZK), brokers = 3, metadataVersion = 
MetadataVersion.IBP_3_9_IV1, serverProperties = Array(
     new ClusterConfigProperty(key = "inter.broker.listener.name", value = 
"EXTERNAL"),
     new ClusterConfigProperty(key = "listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
     new ClusterConfigProperty(key = "advertised.listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
@@ -514,7 +516,7 @@ class ZkMigrationIntegrationTest {
     val clusterId = zkCluster.clusterId()
     val kraftCluster = new KafkaClusterTestKit.Builder(
       new TestKitNodes.Builder().
-        setBootstrapMetadataVersion(MetadataVersion.IBP_3_8_IV0).
+        setBootstrapMetadataVersion(MetadataVersion.IBP_3_9_IV1).
         setClusterId(Uuid.fromString(clusterId)).
         setNumBrokerNodes(0).
         setNumControllerNodes(1).build())
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
index b71ded0cb2a..4734f053547 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
@@ -47,7 +47,7 @@ object ApiVersionsRequestTest {
     List(ClusterConfig.defaultBuilder()
       .setTypes(java.util.Collections.singleton(Type.ZK))
       .setServerProperties(serverProperties)
-      .setMetadataVersion(MetadataVersion.IBP_4_0_IV0)
+      .setMetadataVersion(MetadataVersion.latestTesting())
       .build()).asJava
   }
 
@@ -67,7 +67,7 @@ object ApiVersionsRequestTest {
     serverProperties.put("unstable.feature.versions.enable", "false")
     List(ClusterConfig.defaultBuilder()
       .setTypes(java.util.Collections.singleton(Type.ZK))
-      .setMetadataVersion(MetadataVersion.IBP_3_7_IV4)
+      .setMetadataVersion(MetadataVersion.latestProduction())
       .build()).asJava
   }
 
@@ -83,7 +83,7 @@ object ApiVersionsRequestTest {
 class ApiVersionsRequestTest(cluster: ClusterInstance) extends 
AbstractApiVersionsRequestTest(cluster) {
 
   @ClusterTemplate("testApiVersionsRequestTemplate")
-  @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), metadataVersion = 
MetadataVersion.IBP_4_0_IV0, serverProperties = Array(
+  @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = 
Array(
     new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
     new ClusterConfigProperty(key = "unstable.feature.versions.enable", value 
= "true")
   ))
@@ -108,7 +108,7 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) 
extends AbstractApiVersio
   def testApiVersionsRequestThroughControlPlaneListener(): Unit = {
     val request = new ApiVersionsRequest.Builder().build()
     val apiVersionsResponse = sendApiVersionsRequest(request, 
cluster.controlPlaneListenerName().get())
-    validateApiVersionsResponse(apiVersionsResponse, 
cluster.controlPlaneListenerName().get())
+    validateApiVersionsResponse(apiVersionsResponse, 
cluster.controlPlaneListenerName().get(), true)
   }
 
   @ClusterTest(types = Array(Type.KRAFT))
@@ -131,22 +131,25 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) 
extends AbstractApiVersio
     assertEquals(ApiKeys.API_VERSIONS.latestVersion(), apiVersion.maxVersion())
   }
 
+  // Use the latest production MV for this test
   @ClusterTemplate("testApiVersionsRequestValidationV0Template")
-  @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), metadataVersion = 
MetadataVersion.IBP_3_7_IV4, serverProperties = Array(
+  @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), metadataVersion = 
MetadataVersion.IBP_3_8_IV0, serverProperties = Array(
       new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
       new ClusterConfigProperty(key = "unstable.feature.versions.enable", 
value = "false"),
   ))
   def testApiVersionsRequestValidationV0(): Unit = {
     val apiVersionsRequest = new 
ApiVersionsRequest.Builder().build(0.asInstanceOf[Short])
     val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, 
cluster.clientListener())
-    validateApiVersionsResponse(apiVersionsResponse, apiVersion = 0)
+    validateApiVersionsResponse(apiVersionsResponse, apiVersion = 0,
+      enableUnstableLastVersion = !"false".equals(
+        
cluster.config().serverProperties().get("unstable.api.versions.enable")))
   }
 
   @ClusterTemplate("zkApiVersionsRequest")
   def testApiVersionsRequestValidationV0ThroughControlPlaneListener(): Unit = {
     val apiVersionsRequest = new 
ApiVersionsRequest.Builder().build(0.asInstanceOf[Short])
     val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, 
cluster.controlPlaneListenerName().get())
-    validateApiVersionsResponse(apiVersionsResponse, 
cluster.controlPlaneListenerName().get())
+    validateApiVersionsResponse(apiVersionsResponse, 
cluster.controlPlaneListenerName().get(), true)
   }
 
   @ClusterTest(types = Array(Type.KRAFT))
diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
index 358005cb609..60b8789e428 100644
--- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
@@ -22,11 +22,13 @@ import 
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartit
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{ListOffsetsRequest, 
ListOffsetsResponse}
 import org.apache.kafka.common.{IsolationLevel, TopicPartition}
+import org.apache.kafka.server.config.ServerConfigs
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
-import java.util.Optional
+import java.util.{Optional, Properties}
+import scala.collection.Seq
 import scala.jdk.CollectionConverters._
 
 class ListOffsetsRequestTest extends BaseRequestTest {
@@ -34,6 +36,13 @@ class ListOffsetsRequestTest extends BaseRequestTest {
   val topic = "topic"
   val partition = new TopicPartition(topic, 0)
 
+  override def modifyConfigs(props: Seq[Properties]): Unit = {
+    super.modifyConfigs(props)
+    props.foreach { p =>
+      p.put(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true")
+    }
+  }
+
   @ParameterizedTest
   @ValueSource(strings = Array("zk", "kraft"))
   def testListOffsetsErrorCodes(quorum: String): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index 33e9d3d5d46..3353c6efcc5 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -294,7 +294,7 @@ class ReplicationQuotasTest extends QuorumTestHarness {
     features.add(new BrokerRegistrationRequestData.Feature()
       .setName(MetadataVersion.FEATURE_NAME)
       .setMinSupportedVersion(MetadataVersion.IBP_3_0_IV1.featureLevel())
-      .setMaxSupportedVersion(MetadataVersion.IBP_4_0_IV0.featureLevel()))
+      .setMaxSupportedVersion(MetadataVersion.latestTesting().featureLevel()))
     controllerServer.controller.registerBroker(
       ControllerRequestContextUtil.ANONYMOUS_CONTEXT,
       new BrokerRegistrationRequestData()
diff --git 
a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
index 1ea720d697a..6feab4b4303 100644
--- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
@@ -84,7 +84,9 @@ class SaslApiVersionsRequestTest(cluster: ClusterInstance) 
extends AbstractApiVe
     try {
       val apiVersionsResponse = 
IntegrationTestUtils.sendAndReceive[ApiVersionsResponse](
         new ApiVersionsRequest.Builder().build(0), socket)
-      validateApiVersionsResponse(apiVersionsResponse)
+      validateApiVersionsResponse(apiVersionsResponse,
+        enableUnstableLastVersion = !"false".equals(
+          
cluster.config().serverProperties().get("unstable.api.versions.enable")))
       sendSaslHandshakeRequestValidateResponse(socket)
     } finally {
       socket.close()
@@ -113,7 +115,9 @@ class SaslApiVersionsRequestTest(cluster: ClusterInstance) 
extends AbstractApiVe
       assertEquals(Errors.UNSUPPORTED_VERSION.code, 
apiVersionsResponse.data.errorCode)
       val apiVersionsResponse2 = 
IntegrationTestUtils.sendAndReceive[ApiVersionsResponse](
         new ApiVersionsRequest.Builder().build(0), socket)
-      validateApiVersionsResponse(apiVersionsResponse2)
+      validateApiVersionsResponse(apiVersionsResponse2,
+        enableUnstableLastVersion = !"false".equals(
+          
cluster.config().serverProperties().get("unstable.api.versions.enable")))
       sendSaslHandshakeRequestValidateResponse(socket)
     } finally {
       socket.close()
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index dcd0de0e8be..c4bb10b151c 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -305,6 +305,7 @@ object TestUtils extends Logging {
 
     val props = new Properties
     props.put(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG, "true")
+    props.put(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true")
     if (zkConnect == null) {
       props.setProperty(KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_CONFIG, 
TimeUnit.MINUTES.toMillis(10).toString)
       props.put(KRaftConfigs.NODE_ID_CONFIG, nodeId.toString)
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
index 632eda81b57..cfeb963327c 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
@@ -126,7 +126,7 @@ public class PartitionChangeBuilderTest {
             case (short) 1:
                 return MetadataVersion.IBP_3_7_IV2;
             case (short) 2:
-                return MetadataVersion.IBP_3_8_IV0;
+                return MetadataVersion.IBP_3_9_IV1;
             default:
                 throw new RuntimeException("Unknown PartitionChangeRecord 
version " + version);
         }
@@ -313,7 +313,7 @@ public class PartitionChangeBuilderTest {
      * Test that shrinking the ISR doesn't increase the leader epoch in later 
MVs.
      */
     @ParameterizedTest
-    @ValueSource(strings = {"3.6-IV0", "3.7-IV4"})
+    @ValueSource(strings = {"3.6-IV0", "3.7-IV2", "3.9-IV1"})
     public void testNoLeaderEpochBumpOnIsrShrink(String metadataVersionString) 
{
         MetadataVersion metadataVersion = 
MetadataVersion.fromVersionString(metadataVersionString);
         testTriggerLeaderEpochBumpIfNeeded(
@@ -342,7 +342,7 @@ public class PartitionChangeBuilderTest {
      * Test that shrinking the ISR does increase the leader epoch in later MVs 
when ZK migration is on.
      */
     @ParameterizedTest
-    @ValueSource(strings = {"3.6-IV0", "3.7-IV4"})
+    @ValueSource(strings = {"3.6-IV0", "3.7-IV2", "3.9-IV1"})
     public void testLeaderEpochBumpOnIsrShrinkWithZkMigration(String 
metadataVersionString) {
         MetadataVersion metadataVersion = 
MetadataVersion.fromVersionString(metadataVersionString);
         testTriggerLeaderEpochBumpIfNeeded(
@@ -358,7 +358,7 @@ public class PartitionChangeBuilderTest {
      * Test that expanding the ISR doesn't increase the leader epoch.
      */
     @ParameterizedTest
-    @ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV4"})
+    @ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV2", 
"3.9-IV1"})
     public void testNoLeaderEpochBumpOnIsrExpansion(String 
metadataVersionString) {
         MetadataVersion metadataVersion = 
MetadataVersion.fromVersionString(metadataVersionString);
         testTriggerLeaderEpochBumpIfNeeded(
@@ -372,7 +372,7 @@ public class PartitionChangeBuilderTest {
      * Test that expanding the ISR doesn't increase the leader epoch during ZK 
migration.
      */
     @ParameterizedTest
-    @ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV4"})
+    @ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV2", 
"3.9-IV1"})
     public void testNoLeaderEpochBumpOnIsrExpansionDuringMigration(String 
metadataVersionString) {
         MetadataVersion metadataVersion = 
MetadataVersion.fromVersionString(metadataVersionString);
         testTriggerLeaderEpochBumpIfNeeded(
@@ -389,7 +389,7 @@ public class PartitionChangeBuilderTest {
      * always results in a leader epoch increase.
      */
     @ParameterizedTest
-    @ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV4"})
+    @ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV2", 
"3.9-IV1"})
     public void testLeaderEpochBumpOnNewReplicaSetDisjoint(String 
metadataVersionString) {
         MetadataVersion metadataVersion = 
MetadataVersion.fromVersionString(metadataVersionString);
         testTriggerLeaderEpochBumpIfNeeded(
@@ -403,7 +403,7 @@ public class PartitionChangeBuilderTest {
      * cannot actually change the ISR, 
triggerLeaderEpochBumpForIsrShrinkIfNeeded does not engage.
      */
     @ParameterizedTest
-    @ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV4"})
+    @ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV2"})
     public void testNoLeaderEpochBumpOnEmptyTargetIsr(String 
metadataVersionString) {
         MetadataVersion metadataVersion = 
MetadataVersion.fromVersionString(metadataVersionString);
         PartitionRegistration partition = new PartitionRegistration.Builder().
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 8eed783158f..339bb58cde5 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -197,7 +197,7 @@ public class QuorumControllerTest {
         ) {
             controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
                 new BrokerRegistrationRequestData().
-                setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_4_0_IV0)).
+                setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.latestTesting())).
                 setBrokerId(0).
                 
setLogDirs(Collections.singletonList(Uuid.fromString("iiaQjkRPQcuMULNII0MUeA"))).
                 setClusterId(logEnv.clusterId())).get();
@@ -241,7 +241,7 @@ public class QuorumControllerTest {
         ) {
             controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
                 new BrokerRegistrationRequestData().
-                    setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_4_0_IV0)).
+                    setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.latestTesting())).
                     setBrokerId(0).
                     
setLogDirs(Collections.singletonList(Uuid.fromString("sTbzRAMnTpahIyIPNjiLhw"))).
                     setClusterId(logEnv.clusterId())).get();
@@ -299,7 +299,7 @@ public class QuorumControllerTest {
                     new BrokerRegistrationRequestData().
                         setBrokerId(brokerId).
                         setClusterId(active.clusterId()).
-                        
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_4_0_IV0)).
+                        
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.latestTesting())).
                         setIncarnationId(Uuid.randomUuid()).
                         setListeners(listeners));
                 brokerEpochs.put(brokerId, reply.get().epoch());
@@ -381,7 +381,7 @@ public class QuorumControllerTest {
                 }).
                 setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)).
 
-                
setBootstrapMetadata(BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_8_IV0, 
"test-provided bootstrap ELR enabled")).
+                
setBootstrapMetadata(BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_9_IV1, 
"test-provided bootstrap ELR enabled")).
                 build()
         ) {
             ListenerCollection listeners = new ListenerCollection();
@@ -395,7 +395,7 @@ public class QuorumControllerTest {
                     new BrokerRegistrationRequestData().
                         setBrokerId(brokerId).
                         setClusterId(active.clusterId()).
-                        
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_3_8_IV0)).
+                        
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_3_9_IV1)).
                         setIncarnationId(Uuid.randomUuid()).
                         
setLogDirs(Collections.singletonList(Uuid.randomUuid())).
                         setListeners(listeners));
@@ -464,7 +464,7 @@ public class QuorumControllerTest {
                 new BrokerRegistrationRequestData().
                     setBrokerId(brokerToUncleanShutdown).
                     setClusterId(active.clusterId()).
-                    setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_3_8_IV0)).
+                    setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_3_9_IV1)).
                     setIncarnationId(Uuid.randomUuid()).
                     setLogDirs(Collections.singletonList(Uuid.randomUuid())).
                     setListeners(listeners)).get();
@@ -477,7 +477,7 @@ public class QuorumControllerTest {
                 new BrokerRegistrationRequestData().
                     setBrokerId(lastKnownElr[0]).
                     setClusterId(active.clusterId()).
-                    setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_3_8_IV0)).
+                    setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_3_9_IV1)).
                     setIncarnationId(Uuid.randomUuid()).
                     setLogDirs(Collections.singletonList(Uuid.randomUuid())).
                     setListeners(listeners)).get();
@@ -718,7 +718,7 @@ public class QuorumControllerTest {
                     setBrokerId(0).
                     setClusterId(active.clusterId()).
                     
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
-                    setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_4_0_IV0)).
+                    setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.latestTesting())).
                     
setLogDirs(Collections.singletonList(Uuid.fromString("vBpaRsZVSaGsQT53wtYGtg"))).
                     setListeners(listeners));
             assertEquals(5L, reply.get().epoch());
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
 
b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
index cce2a9ebb05..f0083461f5d 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
@@ -287,7 +287,7 @@ public class PartitionRegistrationTest {
         return Stream.of(
             MetadataVersion.IBP_3_7_IV1,
             MetadataVersion.IBP_3_7_IV2,
-            MetadataVersion.IBP_3_8_IV0
+            MetadataVersion.IBP_3_9_IV1
         ).map(mv -> Arguments.of(mv));
     }
 
@@ -373,7 +373,7 @@ public class PartitionRegistrationTest {
             setPartitionEpoch(0);
         List<UnwritableMetadataException> exceptions = new ArrayList<>();
         ImageWriterOptions options = new ImageWriterOptions.Builder().
-            setMetadataVersion(MetadataVersion.IBP_3_8_IV0).
+            setMetadataVersion(MetadataVersion.IBP_3_9_IV1).
             setLossHandler(exceptions::add).
             build();
         assertEquals(new ApiMessageAndVersion(expectRecord, (short) 2), 
partitionRegistration.toRecord(topicID, 0, options));
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index e7549dd8bad..5b550e5969a 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -202,11 +202,23 @@ public enum MetadataVersion {
     // Add new fetch request version for KIP-951
     IBP_3_7_IV4(19, "3.7", "IV4", false),
 
+    // New version for the Kafka 3.8.0 release.
+    IBP_3_8_IV0(20, "3.8", "IV0", false),
+
+    //
+    // NOTE: MetadataVersions after this point are unstable and may be changed.
+    // If users attempt to use an unstable MetadataVersion, they will get an 
error.
+    // Please move this comment when updating the LATEST_PRODUCTION constant.
+    //
+
+    // Support ListOffsetRequest v9 for KIP-1005.
+    IBP_3_9_IV0(21, "3.9", "IV0", false),
+
     // Add ELR related supports (KIP-966).
-    IBP_3_8_IV0(20, "3.8", "IV0", true),
+    IBP_3_9_IV1(22, "3.9", "IV1", true),
 
     // Introduce version 1 of the GroupVersion feature (KIP-848).
-    IBP_4_0_IV0(21, "4.0", "IV0", false);
+    IBP_4_0_IV0(23, "4.0", "IV0", false);
 
     // NOTES when adding a new version:
     //   Update the default version in @ClusterTest annotation to point to the 
latest version
@@ -232,7 +244,7 @@ public enum MetadataVersion {
      * <strong>Think carefully before you update this value. ONCE A METADATA 
VERSION IS PRODUCTION,
      * IT CANNOT BE CHANGED.</strong>
      */
-    public static final MetadataVersion LATEST_PRODUCTION = IBP_3_7_IV4;
+    public static final MetadataVersion LATEST_PRODUCTION = IBP_3_8_IV0;
 
     /**
      * An array containing all of the MetadataVersion entries.
@@ -331,7 +343,7 @@ public enum MetadataVersion {
     }
 
     public boolean isElrSupported() {
-        return this.isAtLeast(IBP_3_8_IV0);
+        return this.isAtLeast(IBP_3_9_IV1);
     }
 
     public boolean isKRaftSupported() {
@@ -459,7 +471,9 @@ public enum MetadataVersion {
     }
 
     public short listOffsetRequestVersion() {
-        if (this.isAtLeast(IBP_3_5_IV0)) {
+        if (this.isAtLeast(IBP_3_9_IV0)) {
+            return 9;
+        } else if (this.isAtLeast(IBP_3_5_IV0)) {
             return 8;
         } else if (this.isAtLeast(IBP_3_0_IV1)) {
             return 7;
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java
index 2387f838341..daed7bbc7ea 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java
@@ -23,8 +23,8 @@ public enum TestFeatureVersion implements FeatureVersion {
 
     // TEST_1 released right before MV 3.7-IVO was released, and it has no 
dependencies
     TEST_1(1, MetadataVersion.IBP_3_7_IV0, Collections.emptyMap()),
-    // TEST_2 released right before MV 3.8-IVO was released, and it depends on 
this metadata version
-    TEST_2(2, MetadataVersion.IBP_3_8_IV0, 
Collections.singletonMap(MetadataVersion.FEATURE_NAME, 
MetadataVersion.IBP_3_8_IV0.featureLevel()));
+    // TEST_2 released right before MV 3.9-IVO was released, and it depends on 
this metadata version
+    TEST_2(2, MetadataVersion.IBP_3_9_IV0, 
Collections.singletonMap(MetadataVersion.FEATURE_NAME, 
MetadataVersion.IBP_3_9_IV0.featureLevel()));
 
     private final short featureLevel;
     private final MetadataVersion metadataVersionMapping;
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java 
b/server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java
index e11e554002e..e52ae360cd0 100644
--- 
a/server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java
+++ 
b/server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java
@@ -110,7 +110,7 @@ public class FeaturesTest {
     @EnumSource(MetadataVersion.class)
     public void testDefaultTestVersion(MetadataVersion metadataVersion) {
         short expectedVersion;
-        if (!metadataVersion.isLessThan(MetadataVersion.IBP_3_8_IV0)) {
+        if (!metadataVersion.isLessThan(MetadataVersion.IBP_3_9_IV0)) {
             expectedVersion = 2;
         } else if (!metadataVersion.isLessThan(MetadataVersion.IBP_3_7_IV0)) {
             expectedVersion = 1;
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
 
b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
index d8397ce615a..74decc2c56c 100644
--- 
a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
+++ 
b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
@@ -184,8 +184,13 @@ class MetadataVersionTest {
         assertEquals(IBP_3_7_IV3, 
MetadataVersion.fromVersionString("3.7-IV3"));
         assertEquals(IBP_3_7_IV4, 
MetadataVersion.fromVersionString("3.7-IV4"));
 
+        // 3.8-IV0 is the latest production version in the 3.8 line
+        assertEquals(IBP_3_8_IV0, MetadataVersion.fromVersionString("3.8"));
         assertEquals(IBP_3_8_IV0, 
MetadataVersion.fromVersionString("3.8-IV0"));
 
+        assertEquals(IBP_3_9_IV0, 
MetadataVersion.fromVersionString("3.9-IV0"));
+        assertEquals(IBP_3_9_IV1, 
MetadataVersion.fromVersionString("3.9-IV1"));
+
         assertEquals(IBP_4_0_IV0, 
MetadataVersion.fromVersionString("4.0-IV0"));
     }
 
@@ -247,6 +252,8 @@ class MetadataVersionTest {
         assertEquals("3.7", IBP_3_7_IV3.shortVersion());
         assertEquals("3.7", IBP_3_7_IV4.shortVersion());
         assertEquals("3.8", IBP_3_8_IV0.shortVersion());
+        assertEquals("3.9", IBP_3_9_IV0.shortVersion());
+        assertEquals("3.9", IBP_3_9_IV1.shortVersion());
         assertEquals("4.0", IBP_4_0_IV0.shortVersion());
     }
 
@@ -297,6 +304,8 @@ class MetadataVersionTest {
         assertEquals("3.7-IV3", IBP_3_7_IV3.version());
         assertEquals("3.7-IV4", IBP_3_7_IV4.version());
         assertEquals("3.8-IV0", IBP_3_8_IV0.version());
+        assertEquals("3.9-IV0", IBP_3_9_IV0.version());
+        assertEquals("3.9-IV1", IBP_3_9_IV1.version());
         assertEquals("4.0-IV0", IBP_4_0_IV0.version());
     }
 
@@ -365,7 +374,7 @@ class MetadataVersionTest {
     @ParameterizedTest
     @EnumSource(value = MetadataVersion.class)
     public void testIsElrSupported(MetadataVersion metadataVersion) {
-        assertEquals(metadataVersion.isAtLeast(IBP_3_8_IV0), 
metadataVersion.isElrSupported());
+        assertEquals(metadataVersion.isAtLeast(IBP_3_9_IV1), 
metadataVersion.isElrSupported());
     }
 
     @ParameterizedTest
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java
index 7497ee762b2..ef7b839d019 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.record.Record;
+import org.apache.kafka.server.config.ServerConfigs;
 import org.apache.kafka.server.config.ServerLogConfigs;
 import 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager;
 import 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig;
@@ -120,6 +121,8 @@ public class TieredStorageTestUtils {
         //
         // The second-tier storage system is mocked via the LocalTieredStorage 
instance which persists transferred
         // data files on the local file system.
+        overridingProps.put(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, 
"true");
+        
overridingProps.put(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG, 
"true");
         overridingProps.setProperty(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, 
"true");
         overridingProps.setProperty(REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, 
LocalTieredStorage.class.getName());
         
overridingProps.setProperty(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java 
b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
index 987f6afddde..c5bc3487526 100644
--- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
@@ -74,7 +74,8 @@ public class FeatureCommandTest {
                 "SupportedMaxVersion: 4.0-IV0\tFinalizedVersionLevel: 
3.3-IV1\t", outputWithoutEpoch(features.get(1)));
     }
 
-    @ClusterTest(types = {Type.KRAFT}, metadataVersion = 
MetadataVersion.IBP_3_7_IV4)
+    // Use the first MetadataVersion that supports KIP-919
+    @ClusterTest(types = {Type.KRAFT}, metadataVersion = 
MetadataVersion.IBP_3_7_IV0)
     public void testDescribeWithKRaftAndBootstrapControllers(ClusterInstance 
cluster) {
         String commandOutput = ToolsTestUtils.captureStandardOut(() ->
                 assertEquals(0, 
FeatureCommand.mainNoExit("--bootstrap-controller", 
cluster.bootstrapControllers(), "describe"))
@@ -87,7 +88,7 @@ public class FeatureCommandTest {
 
         // Change expected message to reflect latest MetadataVersion 
(SupportedMaxVersion increases when adding a new version)
         assertEquals("Feature: metadata.version\tSupportedMinVersion: 
3.0-IV1\t" +
-                "SupportedMaxVersion: 4.0-IV0\tFinalizedVersionLevel: 
3.7-IV4\t", outputWithoutEpoch(features.get(1)));
+                "SupportedMaxVersion: 4.0-IV0\tFinalizedVersionLevel: 
3.7-IV0\t", outputWithoutEpoch(features.get(1)));
     }
 
     @ClusterTest(types = {Type.ZK}, metadataVersion = 
MetadataVersion.IBP_3_3_IV1)
@@ -146,7 +147,7 @@ public class FeatureCommandTest {
         );
         // Change expected message to reflect possible MetadataVersion range 
1-N (N increases when adding a new version)
         assertEquals("Could not disable metadata.version. Invalid update 
version 0 for feature " +
-                "metadata.version. Local controller 3000 only supports 
versions 1-21", commandOutput);
+                "metadata.version. Local controller 3000 only supports 
versions 1-23", commandOutput);
 
         commandOutput = ToolsTestUtils.captureStandardOut(() ->
                 assertEquals(1, 
FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),


Reply via email to