This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new ebaa108967f KAFKA-16968: Introduce 3.8-IV0, 3.9-IV0, 3.9-IV1 ebaa108967f is described below commit ebaa108967fca35b2fce1d9052e3641046e510fd Author: Colin Patrick McCabe <cmcc...@apache.org> AuthorDate: Thu Jun 27 14:03:03 2024 -0700 KAFKA-16968: Introduce 3.8-IV0, 3.9-IV0, 3.9-IV1 Create 3 new metadata versions: - 3.8-IV0, for the upcoming 3.8 release. - 3.9-IV0, to add support for KIP-1005. - 3.9-IV1, as the new release vehicle for KIP-966. Create ListOffsetRequest v9, which will be used in 3.9-IV0 to support KIP-1005. v9 is currently an unstable API version. Reviewers: Jun Rao <jun...@gmail.com>, Justine Olshan <jols...@confluent.io> --- .../common/message/ListOffsetsRequest.json | 5 ++++- .../common/message/ListOffsetsResponse.json | 4 +++- .../java/kafka/test/ClusterTestExtensionsTest.java | 2 +- .../kafka/zk/ZkMigrationIntegrationTest.scala | 8 +++++--- .../unit/kafka/server/ApiVersionsRequestTest.scala | 17 ++++++++------- .../unit/kafka/server/ListOffsetsRequestTest.scala | 11 +++++++++- .../unit/kafka/server/ReplicationQuotasTest.scala | 2 +- .../kafka/server/SaslApiVersionsRequestTest.scala | 8 ++++++-- .../test/scala/unit/kafka/utils/TestUtils.scala | 1 + .../controller/PartitionChangeBuilderTest.java | 14 ++++++------- .../kafka/controller/QuorumControllerTest.java | 16 +++++++-------- .../kafka/metadata/PartitionRegistrationTest.java | 4 ++-- .../kafka/server/common/MetadataVersion.java | 24 +++++++++++++++++----- .../kafka/server/common/TestFeatureVersion.java | 4 ++-- .../apache/kafka/server/common/FeaturesTest.java | 2 +- .../kafka/server/common/MetadataVersionTest.java | 11 +++++++++- .../storage/utils/TieredStorageTestUtils.java | 3 +++ .../org/apache/kafka/tools/FeatureCommandTest.java | 7 ++++--- 18 files changed, 97 insertions(+), 46 deletions(-) diff --git a/clients/src/main/resources/common/message/ListOffsetsRequest.json b/clients/src/main/resources/common/message/ListOffsetsRequest.json index 5591c5f760a..2b30c974c99 100644 --- a/clients/src/main/resources/common/message/ListOffsetsRequest.json +++ b/clients/src/main/resources/common/message/ListOffsetsRequest.json @@ -34,9 +34,12 @@ // Version 7 enables listing offsets by max timestamp (KIP-734). // // Version 8 enables listing offsets by local log start offset (KIP-405). - "validVersions": "0-8", + // + // Version 9 enables listing offsets by last tiered offset (KIP-1005). + "validVersions": "0-9", "deprecatedVersions": "0", "flexibleVersions": "6+", + "latestVersionUnstable": true, "fields": [ { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId", "about": "The broker ID of the requester, or -1 if this request is being made by a normal consumer." }, diff --git a/clients/src/main/resources/common/message/ListOffsetsResponse.json b/clients/src/main/resources/common/message/ListOffsetsResponse.json index 76177a60a9d..05e164d9128 100644 --- a/clients/src/main/resources/common/message/ListOffsetsResponse.json +++ b/clients/src/main/resources/common/message/ListOffsetsResponse.json @@ -34,7 +34,9 @@ // // Version 8 enables listing offsets by local log start offset. // This is the earliest log start offset in the local log. (KIP-405). - "validVersions": "0-8", + // + // Version 9 enables listing offsets by last tiered offset (KIP-1005). + "validVersions": "0-9", "flexibleVersions": "6+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", "ignorable": true, diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java index c698632094e..498936196ed 100644 --- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java +++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java @@ -187,7 +187,7 @@ public class ClusterTestExtensionsTest { @ClusterTest public void testDefaults(ClusterInstance clusterInstance) { - Assertions.assertEquals(MetadataVersion.IBP_4_0_IV0, clusterInstance.config().metadataVersion()); + Assertions.assertEquals(MetadataVersion.latestTesting(), clusterInstance.config().metadataVersion()); } @ClusterTests({ diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala index e98a8fdeccb..ec9aad30bbc 100644 --- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala @@ -73,7 +73,9 @@ object ZkMigrationIntegrationTest { MetadataVersion.IBP_3_7_IV2, MetadataVersion.IBP_3_7_IV4, MetadataVersion.IBP_3_8_IV0, - MetadataVersion.IBP_4_0_IV0 + MetadataVersion.IBP_3_9_IV0, + MetadataVersion.IBP_3_9_IV1 + // Note: ZK Migration is not supported in Apache Kafka 4.0 and beyond. ).map { mv => val serverProperties = new util.HashMap[String, String]() serverProperties.put("inter.broker.listener.name", "EXTERNAL") @@ -492,7 +494,7 @@ class ZkMigrationIntegrationTest { } } - @ClusterTest(types = Array(Type.ZK), brokers = 3, metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array( + @ClusterTest(types = Array(Type.ZK), brokers = 3, metadataVersion = MetadataVersion.IBP_3_9_IV1, serverProperties = Array( new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"), new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"), new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"), @@ -514,7 +516,7 @@ class ZkMigrationIntegrationTest { val clusterId = zkCluster.clusterId() val kraftCluster = new KafkaClusterTestKit.Builder( new TestKitNodes.Builder(). - setBootstrapMetadataVersion(MetadataVersion.IBP_3_8_IV0). + setBootstrapMetadataVersion(MetadataVersion.IBP_3_9_IV1). setClusterId(Uuid.fromString(clusterId)). setNumBrokerNodes(0). setNumControllerNodes(1).build()) diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala index b71ded0cb2a..4734f053547 100644 --- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala @@ -47,7 +47,7 @@ object ApiVersionsRequestTest { List(ClusterConfig.defaultBuilder() .setTypes(java.util.Collections.singleton(Type.ZK)) .setServerProperties(serverProperties) - .setMetadataVersion(MetadataVersion.IBP_4_0_IV0) + .setMetadataVersion(MetadataVersion.latestTesting()) .build()).asJava } @@ -67,7 +67,7 @@ object ApiVersionsRequestTest { serverProperties.put("unstable.feature.versions.enable", "false") List(ClusterConfig.defaultBuilder() .setTypes(java.util.Collections.singleton(Type.ZK)) - .setMetadataVersion(MetadataVersion.IBP_3_7_IV4) + .setMetadataVersion(MetadataVersion.latestProduction()) .build()).asJava } @@ -83,7 +83,7 @@ object ApiVersionsRequestTest { class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) { @ClusterTemplate("testApiVersionsRequestTemplate") - @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), metadataVersion = MetadataVersion.IBP_4_0_IV0, serverProperties = Array( + @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = Array( new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), new ClusterConfigProperty(key = "unstable.feature.versions.enable", value = "true") )) @@ -108,7 +108,7 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio def testApiVersionsRequestThroughControlPlaneListener(): Unit = { val request = new ApiVersionsRequest.Builder().build() val apiVersionsResponse = sendApiVersionsRequest(request, cluster.controlPlaneListenerName().get()) - validateApiVersionsResponse(apiVersionsResponse, cluster.controlPlaneListenerName().get()) + validateApiVersionsResponse(apiVersionsResponse, cluster.controlPlaneListenerName().get(), true) } @ClusterTest(types = Array(Type.KRAFT)) @@ -131,22 +131,25 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio assertEquals(ApiKeys.API_VERSIONS.latestVersion(), apiVersion.maxVersion()) } + // Use the latest production MV for this test @ClusterTemplate("testApiVersionsRequestValidationV0Template") - @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), metadataVersion = MetadataVersion.IBP_3_7_IV4, serverProperties = Array( + @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array( new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), new ClusterConfigProperty(key = "unstable.feature.versions.enable", value = "false"), )) def testApiVersionsRequestValidationV0(): Unit = { val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0.asInstanceOf[Short]) val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, cluster.clientListener()) - validateApiVersionsResponse(apiVersionsResponse, apiVersion = 0) + validateApiVersionsResponse(apiVersionsResponse, apiVersion = 0, + enableUnstableLastVersion = !"false".equals( + cluster.config().serverProperties().get("unstable.api.versions.enable"))) } @ClusterTemplate("zkApiVersionsRequest") def testApiVersionsRequestValidationV0ThroughControlPlaneListener(): Unit = { val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0.asInstanceOf[Short]) val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, cluster.controlPlaneListenerName().get()) - validateApiVersionsResponse(apiVersionsResponse, cluster.controlPlaneListenerName().get()) + validateApiVersionsResponse(apiVersionsResponse, cluster.controlPlaneListenerName().get(), true) } @ClusterTest(types = Array(Type.KRAFT)) diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala index 358005cb609..60b8789e428 100644 --- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala @@ -22,11 +22,13 @@ import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartit import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse} import org.apache.kafka.common.{IsolationLevel, TopicPartition} +import org.apache.kafka.server.config.ServerConfigs import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource -import java.util.Optional +import java.util.{Optional, Properties} +import scala.collection.Seq import scala.jdk.CollectionConverters._ class ListOffsetsRequestTest extends BaseRequestTest { @@ -34,6 +36,13 @@ class ListOffsetsRequestTest extends BaseRequestTest { val topic = "topic" val partition = new TopicPartition(topic, 0) + override def modifyConfigs(props: Seq[Properties]): Unit = { + super.modifyConfigs(props) + props.foreach { p => + p.put(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true") + } + } + @ParameterizedTest @ValueSource(strings = Array("zk", "kraft")) def testListOffsetsErrorCodes(quorum: String): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala index 33e9d3d5d46..3353c6efcc5 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala @@ -294,7 +294,7 @@ class ReplicationQuotasTest extends QuorumTestHarness { features.add(new BrokerRegistrationRequestData.Feature() .setName(MetadataVersion.FEATURE_NAME) .setMinSupportedVersion(MetadataVersion.IBP_3_0_IV1.featureLevel()) - .setMaxSupportedVersion(MetadataVersion.IBP_4_0_IV0.featureLevel())) + .setMaxSupportedVersion(MetadataVersion.latestTesting().featureLevel())) controllerServer.controller.registerBroker( ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData() diff --git a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala index 1ea720d697a..6feab4b4303 100644 --- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala @@ -84,7 +84,9 @@ class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVe try { val apiVersionsResponse = IntegrationTestUtils.sendAndReceive[ApiVersionsResponse]( new ApiVersionsRequest.Builder().build(0), socket) - validateApiVersionsResponse(apiVersionsResponse) + validateApiVersionsResponse(apiVersionsResponse, + enableUnstableLastVersion = !"false".equals( + cluster.config().serverProperties().get("unstable.api.versions.enable"))) sendSaslHandshakeRequestValidateResponse(socket) } finally { socket.close() @@ -113,7 +115,9 @@ class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVe assertEquals(Errors.UNSUPPORTED_VERSION.code, apiVersionsResponse.data.errorCode) val apiVersionsResponse2 = IntegrationTestUtils.sendAndReceive[ApiVersionsResponse]( new ApiVersionsRequest.Builder().build(0), socket) - validateApiVersionsResponse(apiVersionsResponse2) + validateApiVersionsResponse(apiVersionsResponse2, + enableUnstableLastVersion = !"false".equals( + cluster.config().serverProperties().get("unstable.api.versions.enable"))) sendSaslHandshakeRequestValidateResponse(socket) } finally { socket.close() diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index dcd0de0e8be..c4bb10b151c 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -305,6 +305,7 @@ object TestUtils extends Logging { val props = new Properties props.put(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG, "true") + props.put(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true") if (zkConnect == null) { props.setProperty(KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_CONFIG, TimeUnit.MINUTES.toMillis(10).toString) props.put(KRaftConfigs.NODE_ID_CONFIG, nodeId.toString) diff --git a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java index 632eda81b57..cfeb963327c 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java @@ -126,7 +126,7 @@ public class PartitionChangeBuilderTest { case (short) 1: return MetadataVersion.IBP_3_7_IV2; case (short) 2: - return MetadataVersion.IBP_3_8_IV0; + return MetadataVersion.IBP_3_9_IV1; default: throw new RuntimeException("Unknown PartitionChangeRecord version " + version); } @@ -313,7 +313,7 @@ public class PartitionChangeBuilderTest { * Test that shrinking the ISR doesn't increase the leader epoch in later MVs. */ @ParameterizedTest - @ValueSource(strings = {"3.6-IV0", "3.7-IV4"}) + @ValueSource(strings = {"3.6-IV0", "3.7-IV2", "3.9-IV1"}) public void testNoLeaderEpochBumpOnIsrShrink(String metadataVersionString) { MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString); testTriggerLeaderEpochBumpIfNeeded( @@ -342,7 +342,7 @@ public class PartitionChangeBuilderTest { * Test that shrinking the ISR does increase the leader epoch in later MVs when ZK migration is on. */ @ParameterizedTest - @ValueSource(strings = {"3.6-IV0", "3.7-IV4"}) + @ValueSource(strings = {"3.6-IV0", "3.7-IV2", "3.9-IV1"}) public void testLeaderEpochBumpOnIsrShrinkWithZkMigration(String metadataVersionString) { MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString); testTriggerLeaderEpochBumpIfNeeded( @@ -358,7 +358,7 @@ public class PartitionChangeBuilderTest { * Test that expanding the ISR doesn't increase the leader epoch. */ @ParameterizedTest - @ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV4"}) + @ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV2", "3.9-IV1"}) public void testNoLeaderEpochBumpOnIsrExpansion(String metadataVersionString) { MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString); testTriggerLeaderEpochBumpIfNeeded( @@ -372,7 +372,7 @@ public class PartitionChangeBuilderTest { * Test that expanding the ISR doesn't increase the leader epoch during ZK migration. */ @ParameterizedTest - @ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV4"}) + @ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV2", "3.9-IV1"}) public void testNoLeaderEpochBumpOnIsrExpansionDuringMigration(String metadataVersionString) { MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString); testTriggerLeaderEpochBumpIfNeeded( @@ -389,7 +389,7 @@ public class PartitionChangeBuilderTest { * always results in a leader epoch increase. */ @ParameterizedTest - @ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV4"}) + @ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV2", "3.9-IV1"}) public void testLeaderEpochBumpOnNewReplicaSetDisjoint(String metadataVersionString) { MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString); testTriggerLeaderEpochBumpIfNeeded( @@ -403,7 +403,7 @@ public class PartitionChangeBuilderTest { * cannot actually change the ISR, triggerLeaderEpochBumpForIsrShrinkIfNeeded does not engage. */ @ParameterizedTest - @ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV4"}) + @ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV2"}) public void testNoLeaderEpochBumpOnEmptyTargetIsr(String metadataVersionString) { MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString); PartitionRegistration partition = new PartitionRegistration.Builder(). diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java index 8eed783158f..339bb58cde5 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java @@ -197,7 +197,7 @@ public class QuorumControllerTest { ) { controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData(). - setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_4_0_IV0)). + setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.latestTesting())). setBrokerId(0). setLogDirs(Collections.singletonList(Uuid.fromString("iiaQjkRPQcuMULNII0MUeA"))). setClusterId(logEnv.clusterId())).get(); @@ -241,7 +241,7 @@ public class QuorumControllerTest { ) { controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData(). - setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_4_0_IV0)). + setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.latestTesting())). setBrokerId(0). setLogDirs(Collections.singletonList(Uuid.fromString("sTbzRAMnTpahIyIPNjiLhw"))). setClusterId(logEnv.clusterId())).get(); @@ -299,7 +299,7 @@ public class QuorumControllerTest { new BrokerRegistrationRequestData(). setBrokerId(brokerId). setClusterId(active.clusterId()). - setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_4_0_IV0)). + setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.latestTesting())). setIncarnationId(Uuid.randomUuid()). setListeners(listeners)); brokerEpochs.put(brokerId, reply.get().epoch()); @@ -381,7 +381,7 @@ public class QuorumControllerTest { }). setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)). - setBootstrapMetadata(BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_8_IV0, "test-provided bootstrap ELR enabled")). + setBootstrapMetadata(BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_9_IV1, "test-provided bootstrap ELR enabled")). build() ) { ListenerCollection listeners = new ListenerCollection(); @@ -395,7 +395,7 @@ public class QuorumControllerTest { new BrokerRegistrationRequestData(). setBrokerId(brokerId). setClusterId(active.clusterId()). - setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_8_IV0)). + setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_9_IV1)). setIncarnationId(Uuid.randomUuid()). setLogDirs(Collections.singletonList(Uuid.randomUuid())). setListeners(listeners)); @@ -464,7 +464,7 @@ public class QuorumControllerTest { new BrokerRegistrationRequestData(). setBrokerId(brokerToUncleanShutdown). setClusterId(active.clusterId()). - setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_8_IV0)). + setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_9_IV1)). setIncarnationId(Uuid.randomUuid()). setLogDirs(Collections.singletonList(Uuid.randomUuid())). setListeners(listeners)).get(); @@ -477,7 +477,7 @@ public class QuorumControllerTest { new BrokerRegistrationRequestData(). setBrokerId(lastKnownElr[0]). setClusterId(active.clusterId()). - setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_8_IV0)). + setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_9_IV1)). setIncarnationId(Uuid.randomUuid()). setLogDirs(Collections.singletonList(Uuid.randomUuid())). setListeners(listeners)).get(); @@ -718,7 +718,7 @@ public class QuorumControllerTest { setBrokerId(0). setClusterId(active.clusterId()). setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")). - setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_4_0_IV0)). + setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.latestTesting())). setLogDirs(Collections.singletonList(Uuid.fromString("vBpaRsZVSaGsQT53wtYGtg"))). setListeners(listeners)); assertEquals(5L, reply.get().epoch()); diff --git a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java index cce2a9ebb05..f0083461f5d 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java @@ -287,7 +287,7 @@ public class PartitionRegistrationTest { return Stream.of( MetadataVersion.IBP_3_7_IV1, MetadataVersion.IBP_3_7_IV2, - MetadataVersion.IBP_3_8_IV0 + MetadataVersion.IBP_3_9_IV1 ).map(mv -> Arguments.of(mv)); } @@ -373,7 +373,7 @@ public class PartitionRegistrationTest { setPartitionEpoch(0); List<UnwritableMetadataException> exceptions = new ArrayList<>(); ImageWriterOptions options = new ImageWriterOptions.Builder(). - setMetadataVersion(MetadataVersion.IBP_3_8_IV0). + setMetadataVersion(MetadataVersion.IBP_3_9_IV1). setLossHandler(exceptions::add). build(); assertEquals(new ApiMessageAndVersion(expectRecord, (short) 2), partitionRegistration.toRecord(topicID, 0, options)); diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java index e7549dd8bad..5b550e5969a 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java @@ -202,11 +202,23 @@ public enum MetadataVersion { // Add new fetch request version for KIP-951 IBP_3_7_IV4(19, "3.7", "IV4", false), + // New version for the Kafka 3.8.0 release. + IBP_3_8_IV0(20, "3.8", "IV0", false), + + // + // NOTE: MetadataVersions after this point are unstable and may be changed. + // If users attempt to use an unstable MetadataVersion, they will get an error. + // Please move this comment when updating the LATEST_PRODUCTION constant. + // + + // Support ListOffsetRequest v9 for KIP-1005. + IBP_3_9_IV0(21, "3.9", "IV0", false), + // Add ELR related supports (KIP-966). - IBP_3_8_IV0(20, "3.8", "IV0", true), + IBP_3_9_IV1(22, "3.9", "IV1", true), // Introduce version 1 of the GroupVersion feature (KIP-848). - IBP_4_0_IV0(21, "4.0", "IV0", false); + IBP_4_0_IV0(23, "4.0", "IV0", false); // NOTES when adding a new version: // Update the default version in @ClusterTest annotation to point to the latest version @@ -232,7 +244,7 @@ public enum MetadataVersion { * <strong>Think carefully before you update this value. ONCE A METADATA VERSION IS PRODUCTION, * IT CANNOT BE CHANGED.</strong> */ - public static final MetadataVersion LATEST_PRODUCTION = IBP_3_7_IV4; + public static final MetadataVersion LATEST_PRODUCTION = IBP_3_8_IV0; /** * An array containing all of the MetadataVersion entries. @@ -331,7 +343,7 @@ public enum MetadataVersion { } public boolean isElrSupported() { - return this.isAtLeast(IBP_3_8_IV0); + return this.isAtLeast(IBP_3_9_IV1); } public boolean isKRaftSupported() { @@ -459,7 +471,9 @@ public enum MetadataVersion { } public short listOffsetRequestVersion() { - if (this.isAtLeast(IBP_3_5_IV0)) { + if (this.isAtLeast(IBP_3_9_IV0)) { + return 9; + } else if (this.isAtLeast(IBP_3_5_IV0)) { return 8; } else if (this.isAtLeast(IBP_3_0_IV1)) { return 7; diff --git a/server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java index 2387f838341..daed7bbc7ea 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java @@ -23,8 +23,8 @@ public enum TestFeatureVersion implements FeatureVersion { // TEST_1 released right before MV 3.7-IVO was released, and it has no dependencies TEST_1(1, MetadataVersion.IBP_3_7_IV0, Collections.emptyMap()), - // TEST_2 released right before MV 3.8-IVO was released, and it depends on this metadata version - TEST_2(2, MetadataVersion.IBP_3_8_IV0, Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_8_IV0.featureLevel())); + // TEST_2 released right before MV 3.9-IVO was released, and it depends on this metadata version + TEST_2(2, MetadataVersion.IBP_3_9_IV0, Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_9_IV0.featureLevel())); private final short featureLevel; private final MetadataVersion metadataVersionMapping; diff --git a/server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java b/server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java index e11e554002e..e52ae360cd0 100644 --- a/server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java @@ -110,7 +110,7 @@ public class FeaturesTest { @EnumSource(MetadataVersion.class) public void testDefaultTestVersion(MetadataVersion metadataVersion) { short expectedVersion; - if (!metadataVersion.isLessThan(MetadataVersion.IBP_3_8_IV0)) { + if (!metadataVersion.isLessThan(MetadataVersion.IBP_3_9_IV0)) { expectedVersion = 2; } else if (!metadataVersion.isLessThan(MetadataVersion.IBP_3_7_IV0)) { expectedVersion = 1; diff --git a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java index d8397ce615a..74decc2c56c 100644 --- a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java @@ -184,8 +184,13 @@ class MetadataVersionTest { assertEquals(IBP_3_7_IV3, MetadataVersion.fromVersionString("3.7-IV3")); assertEquals(IBP_3_7_IV4, MetadataVersion.fromVersionString("3.7-IV4")); + // 3.8-IV0 is the latest production version in the 3.8 line + assertEquals(IBP_3_8_IV0, MetadataVersion.fromVersionString("3.8")); assertEquals(IBP_3_8_IV0, MetadataVersion.fromVersionString("3.8-IV0")); + assertEquals(IBP_3_9_IV0, MetadataVersion.fromVersionString("3.9-IV0")); + assertEquals(IBP_3_9_IV1, MetadataVersion.fromVersionString("3.9-IV1")); + assertEquals(IBP_4_0_IV0, MetadataVersion.fromVersionString("4.0-IV0")); } @@ -247,6 +252,8 @@ class MetadataVersionTest { assertEquals("3.7", IBP_3_7_IV3.shortVersion()); assertEquals("3.7", IBP_3_7_IV4.shortVersion()); assertEquals("3.8", IBP_3_8_IV0.shortVersion()); + assertEquals("3.9", IBP_3_9_IV0.shortVersion()); + assertEquals("3.9", IBP_3_9_IV1.shortVersion()); assertEquals("4.0", IBP_4_0_IV0.shortVersion()); } @@ -297,6 +304,8 @@ class MetadataVersionTest { assertEquals("3.7-IV3", IBP_3_7_IV3.version()); assertEquals("3.7-IV4", IBP_3_7_IV4.version()); assertEquals("3.8-IV0", IBP_3_8_IV0.version()); + assertEquals("3.9-IV0", IBP_3_9_IV0.version()); + assertEquals("3.9-IV1", IBP_3_9_IV1.version()); assertEquals("4.0-IV0", IBP_4_0_IV0.version()); } @@ -365,7 +374,7 @@ class MetadataVersionTest { @ParameterizedTest @EnumSource(value = MetadataVersion.class) public void testIsElrSupported(MetadataVersion metadataVersion) { - assertEquals(metadataVersion.isAtLeast(IBP_3_8_IV0), metadataVersion.isElrSupported()); + assertEquals(metadataVersion.isAtLeast(IBP_3_9_IV1), metadataVersion.isElrSupported()); } @ParameterizedTest diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java index 7497ee762b2..ef7b839d019 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.record.Record; +import org.apache.kafka.server.config.ServerConfigs; import org.apache.kafka.server.config.ServerLogConfigs; import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager; import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig; @@ -120,6 +121,8 @@ public class TieredStorageTestUtils { // // The second-tier storage system is mocked via the LocalTieredStorage instance which persists transferred // data files on the local file system. + overridingProps.put(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true"); + overridingProps.put(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG, "true"); overridingProps.setProperty(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true"); overridingProps.setProperty(REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, LocalTieredStorage.class.getName()); overridingProps.setProperty(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java index 987f6afddde..c5bc3487526 100644 --- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java @@ -74,7 +74,8 @@ public class FeatureCommandTest { "SupportedMaxVersion: 4.0-IV0\tFinalizedVersionLevel: 3.3-IV1\t", outputWithoutEpoch(features.get(1))); } - @ClusterTest(types = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_3_7_IV4) + // Use the first MetadataVersion that supports KIP-919 + @ClusterTest(types = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_3_7_IV0) public void testDescribeWithKRaftAndBootstrapControllers(ClusterInstance cluster) { String commandOutput = ToolsTestUtils.captureStandardOut(() -> assertEquals(0, FeatureCommand.mainNoExit("--bootstrap-controller", cluster.bootstrapControllers(), "describe")) @@ -87,7 +88,7 @@ public class FeatureCommandTest { // Change expected message to reflect latest MetadataVersion (SupportedMaxVersion increases when adding a new version) assertEquals("Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" + - "SupportedMaxVersion: 4.0-IV0\tFinalizedVersionLevel: 3.7-IV4\t", outputWithoutEpoch(features.get(1))); + "SupportedMaxVersion: 4.0-IV0\tFinalizedVersionLevel: 3.7-IV0\t", outputWithoutEpoch(features.get(1))); } @ClusterTest(types = {Type.ZK}, metadataVersion = MetadataVersion.IBP_3_3_IV1) @@ -146,7 +147,7 @@ public class FeatureCommandTest { ); // Change expected message to reflect possible MetadataVersion range 1-N (N increases when adding a new version) assertEquals("Could not disable metadata.version. Invalid update version 0 for feature " + - "metadata.version. Local controller 3000 only supports versions 1-21", commandOutput); + "metadata.version. Local controller 3000 only supports versions 1-23", commandOutput); commandOutput = ToolsTestUtils.captureStandardOut(() -> assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),