This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push:
new 98cb8df7a58 MINOR: Bump LATEST_PRODUCTION to 4.1IV1 and Use MV to
enable ELR (#20174)
98cb8df7a58 is described below
commit 98cb8df7a58f25d0c5093101ffe050f3554e6793
Author: Calvin Liu <[email protected]>
AuthorDate: Tue Jul 15 20:23:53 2025 -0700
MINOR: Bump LATEST_PRODUCTION to 4.1IV1 and Use MV to enable ELR (#20174)
Removing the isEligibleLeaderReplicasV1Enabled to let ELR be enabled if
MV is at least 4.1IV1. Also bump the Latest Prod MV to 4.1IV1
Reviewers: Jun Rao <[email protected]>
---
.../kafka/api/PlaintextAdminIntegrationTest.scala | 18 +++++++++++++++++-
.../integration/kafka/server/QuorumTestHarness.scala | 8 +-------
core/src/test/scala/kafka/utils/TestInfoUtils.scala | 8 --------
.../kafka/integration/UncleanLeaderElectionTest.scala | 17 ++++++++++++++++-
.../apache/kafka/metadata/storage/FormatterTest.java | 3 +++
.../apache/kafka/server/common/MetadataVersion.java | 14 +++++++-------
.../kafka/server/common/MetadataVersionTest.java | 4 ++++
tests/kafkatest/version.py | 2 +-
8 files changed, 49 insertions(+), 25 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index c7a45fc8c80..59eba1eb186 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -52,6 +52,7 @@ import
org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEX
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.security.authorizer.AclEntry
+import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion,
MetadataVersion}
import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs,
ServerLogConfigs}
import org.apache.kafka.server.logger.LoggingController
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig,
LogFileUtils}
@@ -60,7 +61,7 @@ import org.apache.logging.log4j.core.config.Configurator
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, Test, TestInfo, Timeout}
import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{MethodSource}
+import org.junit.jupiter.params.provider.MethodSource
import org.slf4j.LoggerFactory
import java.util.AbstractMap.SimpleImmutableEntry
@@ -3002,6 +3003,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
def testElectUncleanLeadersForOnePartition(): Unit = {
// Case: unclean leader election with one topic partition
client = createAdminClient
+ disableEligibleLeaderReplicas(client)
val broker1 = 1
val broker2 = 2
@@ -3029,6 +3031,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
def testElectUncleanLeadersForManyPartitions(): Unit = {
// Case: unclean leader election with many topic partitions
client = createAdminClient
+ disableEligibleLeaderReplicas(client)
val broker1 = 1
val broker2 = 2
@@ -3068,6 +3071,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
def testElectUncleanLeadersForAllPartitions(): Unit = {
// Case: noop unclean leader election and valid unclean leader election
for all partitions
client = createAdminClient
+ disableEligibleLeaderReplicas(client)
val broker1 = 1
val broker2 = 2
@@ -3107,6 +3111,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
def testElectUncleanLeadersForUnknownPartitions(): Unit = {
// Case: unclean leader election for unknown topic
client = createAdminClient
+ disableEligibleLeaderReplicas(client)
val broker1 = 1
val broker2 = 2
@@ -3132,6 +3137,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
def testElectUncleanLeadersWhenNoLiveBrokers(): Unit = {
// Case: unclean leader election with no live brokers
client = createAdminClient
+ disableEligibleLeaderReplicas(client)
val broker1 = 1
val broker2 = 2
@@ -3160,6 +3166,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
def testElectUncleanLeadersNoop(): Unit = {
// Case: noop unclean leader election with explicit topic partitions
client = createAdminClient
+ disableEligibleLeaderReplicas(client)
val broker1 = 1
val broker2 = 2
@@ -3187,6 +3194,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
def testElectUncleanLeadersAndNoop(): Unit = {
// Case: one noop unclean leader election and one valid unclean leader
election
client = createAdminClient
+ disableEligibleLeaderReplicas(client)
val broker1 = 1
val broker2 = 2
@@ -3878,6 +3886,14 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
testAppendConfig(props, "0:0", "1:1,0:0")
}
+ private def disableEligibleLeaderReplicas(admin: Admin): Unit = {
+ if (metadataVersion.isAtLeast(MetadataVersion.IBP_4_1_IV0)) {
+ admin.updateFeatures(
+ util.Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, new
FeatureUpdate(0, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE)),
+ new UpdateFeaturesOptions()).all().get()
+ }
+ }
+
private def testAppendConfig(props: Properties, append: String, expected:
String): Unit = {
client = createAdminClient
createTopic(topic, topicConfig = props)
diff --git
a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index 6c491e739e3..3d5837b92d0 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -40,7 +40,7 @@ import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.queue.KafkaEventQueue
import org.apache.kafka.raft.{MetadataLogConfig, QuorumConfig}
import org.apache.kafka.server.{ClientMetricsManager, ServerSocketFactory}
-import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion,
MetadataVersion, TransactionVersion}
+import org.apache.kafka.server.common.{MetadataVersion, TransactionVersion}
import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs,
ServerLogConfigs}
import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
import org.apache.kafka.server.util.timer.SystemTimer
@@ -284,12 +284,6 @@ abstract class QuorumTestHarness extends Logging {
} else TransactionVersion.TV_1.featureLevel()
formatter.setFeatureLevel(TransactionVersion.FEATURE_NAME,
transactionVersion)
- val elrVersion =
- if (TestInfoUtils.isEligibleLeaderReplicasV1Enabled(testInfo)) {
- EligibleLeaderReplicasVersion.ELRV_1.featureLevel()
- } else EligibleLeaderReplicasVersion.ELRV_0.featureLevel()
- formatter.setFeatureLevel(EligibleLeaderReplicasVersion.FEATURE_NAME,
elrVersion)
-
addFormatterSettings(formatter)
formatter.run()
val bootstrapMetadata = formatter.bootstrapMetadata()
diff --git a/core/src/test/scala/kafka/utils/TestInfoUtils.scala
b/core/src/test/scala/kafka/utils/TestInfoUtils.scala
index 5b6a2239c93..e6c70b6e8fe 100644
--- a/core/src/test/scala/kafka/utils/TestInfoUtils.scala
+++ b/core/src/test/scala/kafka/utils/TestInfoUtils.scala
@@ -50,12 +50,4 @@ object TestInfoUtils {
def isTransactionV2Enabled(testInfo: TestInfo): Boolean = {
!testInfo.getDisplayName.contains("isTV2Enabled=false")
}
-
- /**
- * Returns whether eligible leader replicas version 1 is enabled.
- * When no parameter is provided, the default returned is false.
- */
- def isEligibleLeaderReplicasV1Enabled(testInfo: TestInfo): Boolean = {
- testInfo.getDisplayName.contains("isELRV1Enabled=true")
- }
}
diff --git
a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index e7a8e10d80e..52d9b01ff4a 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -31,7 +31,7 @@ import
org.apache.kafka.common.errors.{InvalidConfigurationException, TimeoutExc
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.clients.admin.{Admin, AdminClientConfig,
AlterConfigOp, AlterConfigsResult, ConfigEntry}
+import org.apache.kafka.clients.admin.{Admin, AdminClientConfig,
AlterConfigOp, AlterConfigsResult, ConfigEntry, FeatureUpdate,
UpdateFeaturesOptions}
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.server.metrics.KafkaYammerMetrics
@@ -42,6 +42,7 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
import com.yammer.metrics.core.Meter
import org.apache.kafka.metadata.LeaderConstants
+import org.apache.kafka.server.common.MetadataVersion
import org.apache.logging.log4j.core.config.Configurator
class UncleanLeaderElectionTest extends QuorumTestHarness {
@@ -119,6 +120,14 @@ class UncleanLeaderElectionTest extends QuorumTestHarness {
admin = TestUtils.createAdminClient(brokers,
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), adminConfigs)
}
+ private def disableEligibleLeaderReplicas(): Unit = {
+ if (metadataVersion.isAtLeast(MetadataVersion.IBP_4_1_IV0)) {
+ admin.updateFeatures(
+ java.util.Map.of("eligible.leader.replicas.version", new
FeatureUpdate(0, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE)),
+ new UpdateFeaturesOptions()).all().get()
+ }
+ }
+
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("getTestGroupProtocolParametersAll"))
def testUncleanLeaderElectionEnabled(groupProtocol: String): Unit = {
@@ -126,6 +135,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness {
configProps1.put("unclean.leader.election.enable", "true")
configProps2.put("unclean.leader.election.enable", "true")
startBrokers(Seq(configProps1, configProps2))
+ disableEligibleLeaderReplicas()
// create topic with 1 partition, 2 replicas, one on each broker
TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers,
replicaAssignment = Map(partitionId -> Seq(brokerId1, brokerId2)))
@@ -137,6 +147,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness {
def testUncleanLeaderElectionDisabled(groupProtocol: String): Unit = {
// unclean leader election is disabled by default
startBrokers(Seq(configProps1, configProps2))
+ disableEligibleLeaderReplicas()
// create topic with 1 partition, 2 replicas, one on each broker
TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers,
replicaAssignment = Map(partitionId -> Seq(brokerId1, brokerId2)))
@@ -151,6 +162,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness {
configProps1.put("unclean.leader.election.enable", "false")
configProps2.put("unclean.leader.election.enable", "false")
startBrokers(Seq(configProps1, configProps2))
+ disableEligibleLeaderReplicas()
// create topic with 1 partition, 2 replicas, one on each broker, and
unclean leader election enabled
val topicProps = new Properties()
@@ -167,6 +179,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness {
configProps1.put("unclean.leader.election.enable", "true")
configProps2.put("unclean.leader.election.enable", "true")
startBrokers(Seq(configProps1, configProps2))
+ disableEligibleLeaderReplicas()
// create topic with 1 partition, 2 replicas, one on each broker, and
unclean leader election disabled
val topicProps = new Properties()
@@ -180,6 +193,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness {
@MethodSource(Array("getTestGroupProtocolParametersAll"))
def testUncleanLeaderElectionInvalidTopicOverride(groupProtocol: String):
Unit = {
startBrokers(Seq(configProps1))
+ disableEligibleLeaderReplicas()
// create topic with an invalid value for unclean leader election
val topicProps = new Properties()
@@ -328,6 +342,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness {
def testTopicUncleanLeaderElectionEnableWithAlterTopicConfigs(groupProtocol:
String): Unit = {
// unclean leader election is disabled by default
startBrokers(Seq(configProps1, configProps2))
+ disableEligibleLeaderReplicas()
// create topic with 1 partition, 2 replicas, one on each broker
TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers,
replicaAssignment = Map(partitionId -> Seq(brokerId1, brokerId2)))
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
index 2eeeab2259a..5ddcd2d8889 100644
---
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
+++
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
@@ -378,6 +378,9 @@ public class FormatterTest {
setName(MetadataVersion.FEATURE_NAME).
setFeatureLevel(MetadataVersion.latestProduction().featureLevel()),
(short) 0));
+ expected.add(new ApiMessageAndVersion(new FeatureLevelRecord().
+ setName(EligibleLeaderReplicasVersion.FEATURE_NAME).
+
setFeatureLevel(EligibleLeaderReplicasVersion.ELRV_1.featureLevel()), (short)
0));
expected.add(new ApiMessageAndVersion(new FeatureLevelRecord().
setName(GroupVersion.FEATURE_NAME).
setFeatureLevel(GroupVersion.GV_1.featureLevel()), (short) 0));
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index f05db6d187a..dd7c5937bdc 100644
---
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -105,12 +105,6 @@ public enum MetadataVersion {
// Enables async remote LIST_OFFSETS support (KIP-1075)
IBP_4_0_IV3(25, "4.0", "IV3", false),
- //
- // NOTE: MetadataVersions after this point are unstable and may be changed.
- // If users attempt to use an unstable MetadataVersion, they will get an
error.
- // Please move this comment when updating the LATEST_PRODUCTION constant.
- //
-
// Enables ELR by default for new clusters (KIP-966).
// Share groups are preview in 4.1 (KIP-932).
// Streams groups are early access in 4.1 (KIP-1071).
@@ -119,6 +113,12 @@ public enum MetadataVersion {
// Send FETCH version 18 in the replica fetcher (KIP-1166)
IBP_4_1_IV1(27, "4.1", "IV1", false),
+ //
+ // NOTE: MetadataVersions after this point are unstable and may be changed.
+ // If users attempt to use an unstable MetadataVersion, they will get an
error.
+ // Please move this comment when updating the LATEST_PRODUCTION constant.
+ //
+
// Insert any additional IBP_4_1_IVx versions above this comment, and bump
the feature level of
// IBP_4_2_IVx accordingly. When 4.2 development begins, IBP_4_2_IV0 will
cease to be
// a placeholder.
@@ -157,7 +157,7 @@ public enum MetadataVersion {
* <strong>Think carefully before you update this value. ONCE A METADATA
VERSION IS PRODUCTION,
* IT CANNOT BE CHANGED.</strong>
*/
- public static final MetadataVersion LATEST_PRODUCTION = IBP_4_0_IV3;
+ public static final MetadataVersion LATEST_PRODUCTION = IBP_4_1_IV1;
// If you change the value above please also update
// LATEST_STABLE_METADATA_VERSION version in tests/kafkatest/version.py
diff --git
a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
index 0414f7cd1cc..cdc66b8b521 100644
---
a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
+++
b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
@@ -78,12 +78,16 @@ class MetadataVersionTest {
assertEquals(IBP_3_9_IV0,
MetadataVersion.fromVersionString("3.9-IV0"));
// 4.0-IV3 is the latest production version in the 4.0 line
+ assertEquals(IBP_4_0_IV3, MetadataVersion.fromVersionString("4.0"));
assertEquals(IBP_4_0_IV0,
MetadataVersion.fromVersionString("4.0-IV0"));
assertEquals(IBP_4_0_IV1,
MetadataVersion.fromVersionString("4.0-IV1"));
assertEquals(IBP_4_0_IV2,
MetadataVersion.fromVersionString("4.0-IV2"));
assertEquals(IBP_4_0_IV3,
MetadataVersion.fromVersionString("4.0-IV3"));
+ // 4.1-IV1 is the latest production version in the 4.1 line
+ assertEquals(IBP_4_1_IV1, MetadataVersion.fromVersionString("4.1"));
assertEquals(IBP_4_1_IV0,
MetadataVersion.fromVersionString("4.1-IV0"));
+ assertEquals(IBP_4_1_IV1,
MetadataVersion.fromVersionString("4.1-IV1"));
}
@Test
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index 0ee8f1aded1..cb5f70b4581 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -114,7 +114,7 @@ DEV_VERSION = KafkaVersion("4.1.0-SNAPSHOT")
LATEST_STABLE_TRANSACTION_VERSION = 2
# This should match the LATEST_PRODUCTION version defined in
MetadataVersion.java
-LATEST_STABLE_METADATA_VERSION = "4.0-IV3"
+LATEST_STABLE_METADATA_VERSION = "4.1-IV1"
# 2.1.x versions
V_2_1_0 = KafkaVersion("2.1.0")