This is an automated email from the ASF dual-hosted git repository. showuon pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 53b048bf0ba KAFKA-15718: Refactor UncleanLeaderElectionTest to enable KRaft later (#16157) 53b048bf0ba is described below commit 53b048bf0ba4b4c68cae2fb0bf43f6a9a0f62e06 Author: Gantigmaa Selenge <39860586+tinasele...@users.noreply.github.com> AuthorDate: Mon Jun 10 12:15:34 2024 +0100 KAFKA-15718: Refactor UncleanLeaderElectionTest to enable KRaft later (#16157) Refactor UncleanLeaderElectionTest to allow to enable KRaft later Reviewers: Luke Chen <show...@gmail.com> --- .../integration/UncleanLeaderElectionTest.scala | 224 ++++++++++++--------- .../scala/unit/kafka/server/FetchRequestTest.scala | 6 +- .../unit/kafka/server/ListOffsetsRequestTest.scala | 4 +- .../scala/unit/kafka/server/LogRecoveryTest.scala | 6 +- .../server/OffsetsForLeaderEpochRequestTest.scala | 2 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 28 ++- 6 files changed, 161 insertions(+), 109 deletions(-) diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala index 7ed403f0a95..828f6eb111c 100755 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -17,28 +17,29 @@ package kafka.integration -import org.apache.kafka.common.config.{ConfigException, ConfigResource} -import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo} - +import java.util.Properties +import java.util.concurrent.ExecutionException import scala.util.Random import scala.jdk.CollectionConverters._ import scala.collection.{Map, Seq} -import org.apache.log4j.{Level, Logger} -import java.util.Properties -import java.util.concurrent.ExecutionException - -import kafka.server.{KafkaConfig, KafkaServer} +import kafka.server.{KafkaBroker, KafkaConfig, QuorumTestHarness} import kafka.utils.{CoreUtils, TestUtils} import kafka.utils.TestUtils._ -import kafka.server.QuorumTestHarness import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.TimeoutException +import org.apache.kafka.common.config.{ConfigResource, TopicConfig} +import org.apache.kafka.common.errors.{InvalidConfigurationException, TimeoutException} import org.apache.kafka.common.serialization.StringDeserializer -import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigsResult, Config, ConfigEntry} +import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigOp, AlterConfigsResult, ConfigEntry} import org.apache.kafka.server.config.ReplicationConfigs +import org.apache.kafka.server.metrics.KafkaYammerMetrics +import org.apache.log4j.{Level, Logger} +import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} import org.junit.jupiter.api.Assertions._ - -import scala.annotation.nowarn +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource +import com.yammer.metrics.core.Meter class UncleanLeaderElectionTest extends QuorumTestHarness { val brokerId1 = 0 @@ -52,11 +53,14 @@ class UncleanLeaderElectionTest extends QuorumTestHarness { var configProps2: Properties = _ var configs: Seq[KafkaConfig] = Seq.empty[KafkaConfig] - var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] + var brokers: Seq[KafkaBroker] = Seq.empty[KafkaBroker] + + var admin: Admin = _ val random = new Random() val topic = "topic" + random.nextLong() val partitionId = 0 + val topicPartition = new TopicPartition(topic, partitionId) val kafkaApisLogger = Logger.getLogger(classOf[kafka.server.KafkaApis]) val networkProcessorLogger = Logger.getLogger(classOf[kafka.network.Processor]) @@ -65,8 +69,8 @@ class UncleanLeaderElectionTest extends QuorumTestHarness { override def setUp(testInfo: TestInfo): Unit = { super.setUp(testInfo) - configProps1 = createBrokerConfig(brokerId1, zkConnect) - configProps2 = createBrokerConfig(brokerId2, zkConnect) + configProps1 = createBrokerConfig(brokerId1, zkConnectOrNull) + configProps2 = createBrokerConfig(brokerId2, zkConnectOrNull) for (configProps <- List(configProps1, configProps2)) { configProps.put("controlled.shutdown.enable", enableControlledShutdown.toString) @@ -81,50 +85,57 @@ class UncleanLeaderElectionTest extends QuorumTestHarness { @AfterEach override def tearDown(): Unit = { - servers.foreach(server => shutdownServer(server)) - servers.foreach(server => CoreUtils.delete(server.config.logDirs)) + brokers.foreach(broker => shutdownBroker(broker)) + brokers.foreach(broker => CoreUtils.delete(broker.config.logDirs)) // restore log levels kafkaApisLogger.setLevel(Level.ERROR) networkProcessorLogger.setLevel(Level.ERROR) + admin.close() + super.tearDown() } private def startBrokers(cluster: Seq[Properties]): Unit = { for (props <- cluster) { val config = KafkaConfig.fromProps(props) - val server = createServer(config) + val broker = createBroker(config = config) configs ++= List(config) - servers ++= List(server) + brokers ++= List(broker) } + + val adminConfigs = new Properties + admin = TestUtils.createAdminClient(brokers, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), adminConfigs) } - @Test - def testUncleanLeaderElectionEnabled(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("zk")) + def testUncleanLeaderElectionEnabled(quorum: String): Unit = { // enable unclean leader election configProps1.put("unclean.leader.election.enable", "true") configProps2.put("unclean.leader.election.enable", "true") startBrokers(Seq(configProps1, configProps2)) // create topic with 1 partition, 2 replicas, one on each broker - TestUtils.createTopic(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2)), servers) - + TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, replicaAssignment = Map(partitionId -> Seq(brokerId1, brokerId2))) verifyUncleanLeaderElectionEnabled() } - @Test + @ParameterizedTest + @ValueSource(strings = Array("zk")) def testUncleanLeaderElectionDisabled(): Unit = { // unclean leader election is disabled by default startBrokers(Seq(configProps1, configProps2)) // create topic with 1 partition, 2 replicas, one on each broker - TestUtils.createTopic(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2)), servers) + TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, replicaAssignment = Map(partitionId -> Seq(brokerId1, brokerId2))) verifyUncleanLeaderElectionDisabled() } - @Test + @ParameterizedTest + @ValueSource(strings = Array("zk")) def testUncleanLeaderElectionEnabledByTopicOverride(): Unit = { // disable unclean leader election globally, but enable for our specific test topic configProps1.put("unclean.leader.election.enable", "false") @@ -133,13 +144,14 @@ class UncleanLeaderElectionTest extends QuorumTestHarness { // create topic with 1 partition, 2 replicas, one on each broker, and unclean leader election enabled val topicProps = new Properties() - topicProps.put("unclean.leader.election.enable", "true") - TestUtils.createTopic(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2)), servers, topicProps) + topicProps.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "true") + TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, replicaAssignment = Map(partitionId -> Seq(brokerId1, brokerId2)), topicConfig = topicProps) verifyUncleanLeaderElectionEnabled() } - @Test + @ParameterizedTest + @ValueSource(strings = Array("zk")) def testUncleanLeaderElectionDisabledByTopicOverride(): Unit = { // enable unclean leader election globally, but disable for our specific test topic configProps1.put("unclean.leader.election.enable", "true") @@ -148,58 +160,64 @@ class UncleanLeaderElectionTest extends QuorumTestHarness { // create topic with 1 partition, 2 replicas, one on each broker, and unclean leader election disabled val topicProps = new Properties() - topicProps.put("unclean.leader.election.enable", "false") - TestUtils.createTopic(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2)), servers, topicProps) + topicProps.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "false") + TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, replicaAssignment = Map(partitionId -> Seq(brokerId1, brokerId2)), topicConfig = topicProps) verifyUncleanLeaderElectionDisabled() } - @Test + @ParameterizedTest + @ValueSource(strings = Array("zk")) def testUncleanLeaderElectionInvalidTopicOverride(): Unit = { startBrokers(Seq(configProps1)) // create topic with an invalid value for unclean leader election val topicProps = new Properties() - topicProps.put("unclean.leader.election.enable", "invalid") + topicProps.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "invalid") + + val e = assertThrows(classOf[ExecutionException], + () => TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, replicaAssignment = Map(partitionId -> Seq(brokerId1, brokerId2)), topicConfig = topicProps)) - assertThrows(classOf[ConfigException], - () => TestUtils.createTopic(zkClient, topic, Map(partitionId -> Seq(brokerId1)), servers, topicProps)) + assertEquals(classOf[InvalidConfigurationException], e.getCause.getClass) } def verifyUncleanLeaderElectionEnabled(): Unit = { // wait until leader is elected - val leaderId = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId) - debug("Leader for " + topic + " is elected to be: %s".format(leaderId)) + val leaderId = awaitLeaderChange(brokers, topicPartition) + debug("Leader for " + topic + " is elected to be: %s".format(leaderId)) assertTrue(leaderId == brokerId1 || leaderId == brokerId2, "Leader id is set to expected value for topic: " + topic) // the non-leader broker is the follower val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1 - debug("Follower for " + topic + " is: %s".format(followerId)) + debug("Follower for " + topic + " is: %s".format(followerId)) - produceMessage(servers, topic, "first") - waitForPartitionMetadata(servers, topic, partitionId) + produceMessage(brokers, topic, "first") + waitForPartitionMetadata(brokers, topic, partitionId) assertEquals(List("first"), consumeAllMessages(topic, 1)) // shutdown follower server - servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server)) + brokers.filter(broker => broker.config.brokerId == followerId).map(broker => shutdownBroker(broker)) - produceMessage(servers, topic, "second") + produceMessage(brokers, topic, "second") assertEquals(List("first", "second"), consumeAllMessages(topic, 2)) - //remove any previous unclean election metric - servers.map(_.kafkaController.controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec")) + //verify that unclean election metric count is 0 + val uncleanLeaderElectionsPerSecGauge = getGauge("UncleanLeaderElectionsPerSec") + @volatile var uncleanLeaderElectionsPerSec = uncleanLeaderElectionsPerSecGauge.count() + assert(uncleanLeaderElectionsPerSec == 0) // shutdown leader and then restart follower - servers.filter(_.config.brokerId == leaderId).map(shutdownServer) - val followerServer = servers.find(_.config.brokerId == followerId).get - followerServer.startup() + brokers.filter(_.config.brokerId == leaderId).map(shutdownBroker) + val followerBroker = brokers.find(_.config.brokerId == followerId).get + followerBroker.startup() // wait until new leader is (uncleanly) elected - waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(followerId)) - assertEquals(1, followerServer.kafkaController.controllerContext.stats.uncleanLeaderElectionRate.count()) + awaitLeaderChange(brokers, topicPartition, expectedLeaderOpt = Some(followerId), timeout = 30000) + uncleanLeaderElectionsPerSec = uncleanLeaderElectionsPerSecGauge.count() + assert(uncleanLeaderElectionsPerSec == 1) - produceMessage(servers, topic, "third") + produceMessage(brokers, topic, "third") // second message was lost due to unclean election assertEquals(List("first", "third"), consumeAllMessages(topic, 2)) @@ -207,7 +225,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness { def verifyUncleanLeaderElectionDisabled(): Unit = { // wait until leader is elected - val leaderId = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId) + val leaderId = awaitLeaderChange(brokers, topicPartition) debug("Leader for " + topic + " is elected to be: %s".format(leaderId)) assertTrue(leaderId == brokerId1 || leaderId == brokerId2, "Leader id is set to expected value for topic: " + topic) @@ -216,60 +234,70 @@ class UncleanLeaderElectionTest extends QuorumTestHarness { val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1 debug("Follower for " + topic + " is: %s".format(followerId)) - produceMessage(servers, topic, "first") - waitForPartitionMetadata(servers, topic, partitionId) + produceMessage(brokers, topic, "first") + waitForPartitionMetadata(brokers, topic, partitionId) assertEquals(List("first"), consumeAllMessages(topic, 1)) // shutdown follower server - servers.filter(server => server.config.brokerId == followerId).foreach(server => shutdownServer(server)) + brokers.filter(broker => broker.config.brokerId == followerId).map(broker => shutdownBroker(broker)) - produceMessage(servers, topic, "second") + produceMessage(brokers, topic, "second") assertEquals(List("first", "second"), consumeAllMessages(topic, 2)) //remove any previous unclean election metric - servers.foreach(server => server.kafkaController.controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec")) + val uncleanLeaderElectionsPerSecGauge = getGauge("UncleanLeaderElectionsPerSec") + @volatile var uncleanLeaderElectionsPerSec = uncleanLeaderElectionsPerSecGauge.count() + assert(uncleanLeaderElectionsPerSec == 0) // shutdown leader and then restart follower - servers.filter(server => server.config.brokerId == leaderId).foreach(server => shutdownServer(server)) - val followerServer = servers.find(_.config.brokerId == followerId).get + brokers.filter(_.config.brokerId == leaderId).map(shutdownBroker) + val followerServer = brokers.find(_.config.brokerId == followerId).get followerServer.startup() // verify that unclean election to non-ISR follower does not occur - waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(-1)) - assertEquals(0, followerServer.kafkaController.controllerContext.stats.uncleanLeaderElectionRate.count()) + awaitLeaderChange(brokers, topicPartition, expectedLeaderOpt = Some(leaderId)) + uncleanLeaderElectionsPerSec = uncleanLeaderElectionsPerSecGauge.count() + assert(uncleanLeaderElectionsPerSec == 0) // message production and consumption should both fail while leader is down - val e = assertThrows(classOf[ExecutionException], () => produceMessage(servers, topic, "third", deliveryTimeoutMs = 1000, requestTimeoutMs = 1000)) + val e = assertThrows(classOf[ExecutionException], () => produceMessage(brokers, topic, "third", deliveryTimeoutMs = 1000, requestTimeoutMs = 1000)) assertEquals(classOf[TimeoutException], e.getCause.getClass) assertEquals(List.empty[String], consumeAllMessages(topic, 0)) // restart leader temporarily to send a successfully replicated message - servers.filter(server => server.config.brokerId == leaderId).foreach(server => server.startup()) - waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(leaderId)) + brokers.find(_.config.brokerId == leaderId).get.startup() + awaitLeaderChange(brokers, topicPartition, expectedLeaderOpt = Some(leaderId)) - produceMessage(servers, topic, "third") + produceMessage(brokers, topic, "third") //make sure follower server joins the ISR TestUtils.waitUntilTrue(() => { val partitionInfoOpt = followerServer.metadataCache.getPartitionInfo(topic, partitionId) partitionInfoOpt.isDefined && partitionInfoOpt.get.isr.contains(followerId) }, "Inconsistent metadata after first server startup") - servers.filter(server => server.config.brokerId == leaderId).foreach(server => shutdownServer(server)) - // verify clean leader transition to ISR follower - waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(followerId)) + brokers.filter(_.config.brokerId == leaderId).map(shutdownBroker) + // verify clean leader transition to ISR follower + awaitLeaderChange(brokers, topicPartition, expectedLeaderOpt = Some(followerId)) // verify messages can be consumed from ISR follower that was just promoted to leader assertEquals(List("first", "second", "third"), consumeAllMessages(topic, 3)) } - private def shutdownServer(server: KafkaServer): Unit = { - server.shutdown() - server.awaitShutdown() + private def getGauge(metricName: String) = { + KafkaYammerMetrics.defaultRegistry.allMetrics.asScala + .find { case (k, _) => k.getName.endsWith(metricName) } + .getOrElse(throw new AssertionError("Unable to find metric " + metricName)) + ._2.asInstanceOf[Meter] + } + + private def shutdownBroker(broker: KafkaBroker) = { + broker.shutdown() + broker.awaitShutdown() } private def consumeAllMessages(topic: String, numMessages: Int): Seq[String] = { - val brokerList = TestUtils.plaintextBootstrapServers(servers) + val brokerList = TestUtils.plaintextBootstrapServers(brokers) // Don't rely on coordinator as it may be down when this method is called val consumer = TestUtils.createConsumer(brokerList, groupId = "group" + random.nextLong(), @@ -283,42 +311,48 @@ class UncleanLeaderElectionTest extends QuorumTestHarness { } finally consumer.close() } - @Test - def testTopicUncleanLeaderElectionEnable(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("zk")) + def testTopicUncleanLeaderElectionEnableWithAlterTopicConfigs(): Unit = { // unclean leader election is disabled by default startBrokers(Seq(configProps1, configProps2)) // create topic with 1 partition, 2 replicas, one on each broker - adminZkClient.createTopicWithAssignment(topic, config = new Properties(), Map(partitionId -> Seq(brokerId1, brokerId2))) + TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, replicaAssignment = Map(partitionId -> Seq(brokerId1, brokerId2))) // wait until leader is elected - val leaderId = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId) + val leaderId = awaitLeaderChange(brokers, topicPartition) // the non-leader broker is the follower val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1 - produceMessage(servers, topic, "first") - waitForPartitionMetadata(servers, topic, partitionId) + produceMessage(brokers, topic, "first") + waitForPartitionMetadata(brokers, topic, partitionId) assertEquals(List("first"), consumeAllMessages(topic, 1)) // shutdown follower server - servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server)) + brokers.filter(broker => broker.config.brokerId == followerId).map(broker => shutdownBroker(broker)) - produceMessage(servers, topic, "second") + produceMessage(brokers, topic, "second") assertEquals(List("first", "second"), consumeAllMessages(topic, 2)) - //remove any previous unclean election metric - servers.map(server => server.kafkaController.controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec")) + //verify that unclean election metric count is 0 + val uncleanLeaderElectionsPerSecGauge = getGauge("UncleanLeaderElectionsPerSec") + @volatile var uncleanLeaderElectionsPerSec = uncleanLeaderElectionsPerSecGauge.count() + assert(uncleanLeaderElectionsPerSec == 0) // shutdown leader and then restart follower - servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server)) - val followerServer = servers.find(_.config.brokerId == followerId).get - followerServer.startup() + brokers.filter(_.config.brokerId == leaderId).map(shutdownBroker) + val followerBroker = brokers.find(_.config.brokerId == followerId).get + followerBroker.startup() - assertEquals(0, followerServer.kafkaController.controllerContext.stats.uncleanLeaderElectionRate.count()) + // leader should not change + awaitLeaderChange(brokers, topicPartition, expectedLeaderOpt = Some(leaderId), timeout = 30000) + uncleanLeaderElectionsPerSec = uncleanLeaderElectionsPerSecGauge.count() + assert(uncleanLeaderElectionsPerSec == 0) // message production and consumption should both fail while leader is down - val e = assertThrows(classOf[ExecutionException], () => produceMessage(servers, topic, "third", deliveryTimeoutMs = 1000, requestTimeoutMs = 1000)) + val e = assertThrows(classOf[ExecutionException], () => produceMessage(brokers, topic, "third", deliveryTimeoutMs = 1000, requestTimeoutMs = 1000)) assertEquals(classOf[TimeoutException], e.getCause.getClass) assertEquals(List.empty[String], consumeAllMessages(topic, 0)) @@ -331,26 +365,26 @@ class UncleanLeaderElectionTest extends QuorumTestHarness { adminClient.close() // wait until new leader is (uncleanly) elected - waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(followerId)) - assertEquals(1, followerServer.kafkaController.controllerContext.stats.uncleanLeaderElectionRate.count()) + awaitLeaderChange(brokers, topicPartition, expectedLeaderOpt = Some(followerId), timeout = 30000) + uncleanLeaderElectionsPerSec = uncleanLeaderElectionsPerSecGauge.count() + assert(uncleanLeaderElectionsPerSec == 1) - produceMessage(servers, topic, "third") + produceMessage(brokers, topic, "third") // second message was lost due to unclean election assertEquals(List("first", "third"), consumeAllMessages(topic, 2)) } - @nowarn("cat=deprecation") private def alterTopicConfigs(adminClient: Admin, topic: String, topicConfigs: Properties): AlterConfigsResult = { val configEntries = topicConfigs.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava - val newConfig = new Config(configEntries) - val configs = Map(new ConfigResource(ConfigResource.Type.TOPIC, topic) -> newConfig).asJava - adminClient.alterConfigs(configs) + adminClient.incrementalAlterConfigs(Map(new ConfigResource(ConfigResource.Type.TOPIC, topic) -> + configEntries.asScala.map((e: ConfigEntry) => new AlterConfigOp(e, AlterConfigOp.OpType.SET)).toSeq + .asJavaCollection).asJava) } private def createAdminClient(): Admin = { val config = new Properties - val bootstrapServers = TestUtils.plaintextBootstrapServers(servers) + val bootstrapServers = TestUtils.plaintextBootstrapServers(brokers) config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) config.put(AdminClientConfig.METADATA_MAX_AGE_CONFIG, "10") Admin.create(config) diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala index 2411e612d20..0a4f50ba120 100644 --- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala @@ -223,7 +223,7 @@ class FetchRequestTest extends BaseFetchRequestTest { // Force a leader change killBroker(firstLeaderId) // Write some more data in epoch 1 - val secondLeaderId = TestUtils.awaitLeaderChange(brokers, topicPartition, firstLeaderId) + val secondLeaderId = TestUtils.awaitLeaderChange(brokers, topicPartition, oldLeaderOpt = Some(firstLeaderId)) val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId, topicPartition, brokers) val secondEpochResponses = produceData(Seq(topicPartition), 100) val secondEpochEndOffset = secondEpochResponses.lastOption.get.offset + 1 @@ -285,7 +285,7 @@ class FetchRequestTest extends BaseFetchRequestTest { killBroker(firstLeaderId) // Check leader error codes - val secondLeaderId = TestUtils.awaitLeaderChange(brokers, topicPartition, firstLeaderId) + val secondLeaderId = TestUtils.awaitLeaderChange(brokers, topicPartition, oldLeaderOpt = Some(firstLeaderId)) val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId, topicPartition, brokers) assertResponseErrorForEpoch(Errors.NONE, secondLeaderId, Optional.empty()) assertResponseErrorForEpoch(Errors.NONE, secondLeaderId, Optional.of(secondLeaderEpoch)) @@ -322,7 +322,7 @@ class FetchRequestTest extends BaseFetchRequestTest { // -1 is treated as having no epoch at all killBroker(firstLeaderId) - val secondLeaderId = TestUtils.awaitLeaderChange(brokers, topicPartition, firstLeaderId) + val secondLeaderId = TestUtils.awaitLeaderChange(brokers, topicPartition, oldLeaderOpt = Some(firstLeaderId)) val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId, topicPartition, brokers) verifyFetchSessionErrors(topicPartition, secondLeaderEpoch, secondLeaderId, version) diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala index 48397a4b71d..358005cb609 100644 --- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala @@ -125,7 +125,7 @@ class ListOffsetsRequestTest extends BaseRequestTest { killBroker(firstLeaderId) // Check leader error codes - val secondLeaderId = TestUtils.awaitLeaderChange(brokers, topicPartition, firstLeaderId) + val secondLeaderId = TestUtils.awaitLeaderChange(brokers, topicPartition, oldLeaderOpt = Some(firstLeaderId)) val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId, topicPartition, brokers) assertResponseErrorForEpoch(Errors.NONE, secondLeaderId, Optional.empty()) assertResponseErrorForEpoch(Errors.NONE, secondLeaderId, Optional.of(secondLeaderEpoch)) @@ -198,7 +198,7 @@ class ListOffsetsRequestTest extends BaseRequestTest { // Kill the first leader so that we can verify the epoch change when fetching the latest offset killBroker(firstLeaderId) - val secondLeaderId = TestUtils.awaitLeaderChange(brokers, partition, firstLeaderId) + val secondLeaderId = TestUtils.awaitLeaderChange(brokers, partition, oldLeaderOpt = Some(firstLeaderId)) // make sure high watermark of new leader has caught up TestUtils.waitUntilTrue(() => sendRequest(secondLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, -1).errorCode != Errors.OFFSET_NOT_AVAILABLE.code, "the second leader does not sync to follower") diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index a3bdb9a1723..21dba44f9f7 100755 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -138,7 +138,7 @@ class LogRecoveryTest extends QuorumTestHarness { assertEquals(hw, hwFile1.read().getOrElse(topicPartition, 0L)) // check if leader moves to the other server - leader = awaitLeaderChange(servers, topicPartition, leader) + leader = awaitLeaderChange(servers, topicPartition, oldLeaderOpt = Some(leader)) assertEquals(1, leader, "Leader must move to broker 1") // bring the preferred replica back @@ -166,7 +166,7 @@ class LogRecoveryTest extends QuorumTestHarness { server2.startup() updateProducer() - leader = awaitLeaderChange(servers, topicPartition, leader) + leader = awaitLeaderChange(servers, topicPartition, oldLeaderOpt = Some(leader)) assertTrue(leader == 0 || leader == 1, "Leader must remain on broker 0, in case of ZooKeeper session expiration it can move to broker 1") @@ -221,7 +221,7 @@ class LogRecoveryTest extends QuorumTestHarness { server2.startup() updateProducer() // check if leader moves to the other server - leader = awaitLeaderChange(servers, topicPartition, leader) + leader = awaitLeaderChange(servers, topicPartition, oldLeaderOpt = Some(leader)) assertEquals(1, leader, "Leader must move to broker 1") assertEquals(hw, hwFile1.read().getOrElse(topicPartition, 0L)) diff --git a/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala index 78c10ec5ec4..29e1b70a05d 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala @@ -79,7 +79,7 @@ class OffsetsForLeaderEpochRequestTest extends BaseRequestTest { killBroker(firstLeaderId) // Check leader error codes - val secondLeaderId = TestUtils.awaitLeaderChange(brokers, topicPartition, firstLeaderId) + val secondLeaderId = TestUtils.awaitLeaderChange(brokers, topicPartition, oldLeaderOpt = Some(firstLeaderId)) val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId, topicPartition, brokers) assertResponseErrorForEpoch(Errors.NONE, secondLeaderId, Optional.empty()) assertResponseErrorForEpoch(Errors.NONE, secondLeaderId, Optional.of(secondLeaderEpoch)) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 35b7ce418d1..5eb3187ff30 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -1099,13 +1099,31 @@ object TestUtils extends Logging { def awaitLeaderChange[B <: KafkaBroker]( brokers: Seq[B], tp: TopicPartition, - oldLeader: Int, + oldLeaderOpt: Option[Int] = None, + expectedLeaderOpt: Option[Int] = None, timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Int = { def newLeaderExists: Option[Int] = { - brokers.find { broker => - broker.config.brokerId != oldLeader && - broker.replicaManager.onlinePartition(tp).exists(_.leaderLogIfLocal.isDefined) - }.map(_.config.brokerId) + if (expectedLeaderOpt.isDefined) { + debug(s"Checking leader that has changed to ${expectedLeaderOpt.get}") + brokers.find { broker => + broker.config.brokerId == expectedLeaderOpt.get && + broker.replicaManager.onlinePartition(tp).exists(_.leaderLogIfLocal.isDefined) + }.map(_.config.brokerId) + + } else if (oldLeaderOpt.isDefined) { + debug(s"Checking leader that has changed from ${oldLeaderOpt}") + brokers.find { broker => + broker.replicaManager.onlinePartition(tp).exists(_.leaderLogIfLocal.isDefined) + broker.config.brokerId != oldLeaderOpt.get && + broker.replicaManager.onlinePartition(tp).exists(_.leaderLogIfLocal.isDefined) + }.map(_.config.brokerId) + + } else { + debug(s"Checking the elected leader") + brokers.find { broker => + broker.replicaManager.onlinePartition(tp).exists(_.leaderLogIfLocal.isDefined) + }.map(_.config.brokerId) + } } waitUntilTrue(() => newLeaderExists.isDefined,