This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 53b048bf0ba KAFKA-15718: Refactor UncleanLeaderElectionTest to enable 
KRaft later (#16157)
53b048bf0ba is described below

commit 53b048bf0ba4b4c68cae2fb0bf43f6a9a0f62e06
Author: Gantigmaa Selenge <39860586+tinasele...@users.noreply.github.com>
AuthorDate: Mon Jun 10 12:15:34 2024 +0100

    KAFKA-15718: Refactor UncleanLeaderElectionTest to enable KRaft later 
(#16157)
    
    Refactor UncleanLeaderElectionTest to allow to enable KRaft later
    
    Reviewers: Luke Chen <show...@gmail.com>
---
 .../integration/UncleanLeaderElectionTest.scala    | 224 ++++++++++++---------
 .../scala/unit/kafka/server/FetchRequestTest.scala |   6 +-
 .../unit/kafka/server/ListOffsetsRequestTest.scala |   4 +-
 .../scala/unit/kafka/server/LogRecoveryTest.scala  |   6 +-
 .../server/OffsetsForLeaderEpochRequestTest.scala  |   2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  28 ++-
 6 files changed, 161 insertions(+), 109 deletions(-)

diff --git 
a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala 
b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index 7ed403f0a95..828f6eb111c 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -17,28 +17,29 @@
 
 package kafka.integration
 
-import org.apache.kafka.common.config.{ConfigException, ConfigResource}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
-
+import java.util.Properties
+import java.util.concurrent.ExecutionException
 import scala.util.Random
 import scala.jdk.CollectionConverters._
 import scala.collection.{Map, Seq}
-import org.apache.log4j.{Level, Logger}
-import java.util.Properties
-import java.util.concurrent.ExecutionException
-
-import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.server.{KafkaBroker, KafkaConfig, QuorumTestHarness}
 import kafka.utils.{CoreUtils, TestUtils}
 import kafka.utils.TestUtils._
-import kafka.server.QuorumTestHarness
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.TimeoutException
+import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
+import org.apache.kafka.common.errors.{InvalidConfigurationException, 
TimeoutException}
 import org.apache.kafka.common.serialization.StringDeserializer
-import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, 
AlterConfigsResult, Config, ConfigEntry}
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, 
AlterConfigOp, AlterConfigsResult, ConfigEntry}
 import org.apache.kafka.server.config.ReplicationConfigs
+import org.apache.kafka.server.metrics.KafkaYammerMetrics
+import org.apache.log4j.{Level, Logger}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 import org.junit.jupiter.api.Assertions._
-
-import scala.annotation.nowarn
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+import com.yammer.metrics.core.Meter
 
 class UncleanLeaderElectionTest extends QuorumTestHarness {
   val brokerId1 = 0
@@ -52,11 +53,14 @@ class UncleanLeaderElectionTest extends QuorumTestHarness {
   var configProps2: Properties = _
 
   var configs: Seq[KafkaConfig] = Seq.empty[KafkaConfig]
-  var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
+  var brokers: Seq[KafkaBroker] = Seq.empty[KafkaBroker]
+
+  var admin: Admin = _
 
   val random = new Random()
   val topic = "topic" + random.nextLong()
   val partitionId = 0
+  val topicPartition = new TopicPartition(topic, partitionId)
 
   val kafkaApisLogger = Logger.getLogger(classOf[kafka.server.KafkaApis])
   val networkProcessorLogger = 
Logger.getLogger(classOf[kafka.network.Processor])
@@ -65,8 +69,8 @@ class UncleanLeaderElectionTest extends QuorumTestHarness {
   override def setUp(testInfo: TestInfo): Unit = {
     super.setUp(testInfo)
 
-    configProps1 = createBrokerConfig(brokerId1, zkConnect)
-    configProps2 = createBrokerConfig(brokerId2, zkConnect)
+    configProps1 = createBrokerConfig(brokerId1, zkConnectOrNull)
+    configProps2 = createBrokerConfig(brokerId2, zkConnectOrNull)
 
     for (configProps <- List(configProps1, configProps2)) {
       configProps.put("controlled.shutdown.enable", 
enableControlledShutdown.toString)
@@ -81,50 +85,57 @@ class UncleanLeaderElectionTest extends QuorumTestHarness {
 
   @AfterEach
   override def tearDown(): Unit = {
-    servers.foreach(server => shutdownServer(server))
-    servers.foreach(server => CoreUtils.delete(server.config.logDirs))
+    brokers.foreach(broker => shutdownBroker(broker))
+    brokers.foreach(broker => CoreUtils.delete(broker.config.logDirs))
 
     // restore log levels
     kafkaApisLogger.setLevel(Level.ERROR)
     networkProcessorLogger.setLevel(Level.ERROR)
 
+    admin.close()
+
     super.tearDown()
   }
 
   private def startBrokers(cluster: Seq[Properties]): Unit = {
     for (props <- cluster) {
       val config = KafkaConfig.fromProps(props)
-      val server = createServer(config)
+      val broker = createBroker(config = config)
       configs ++= List(config)
-      servers ++= List(server)
+      brokers ++= List(broker)
     }
+
+    val adminConfigs = new Properties
+    admin = TestUtils.createAdminClient(brokers, 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), adminConfigs)
   }
 
-  @Test
-  def testUncleanLeaderElectionEnabled(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk"))
+  def testUncleanLeaderElectionEnabled(quorum: String): Unit = {
     // enable unclean leader election
     configProps1.put("unclean.leader.election.enable", "true")
     configProps2.put("unclean.leader.election.enable", "true")
     startBrokers(Seq(configProps1, configProps2))
 
     // create topic with 1 partition, 2 replicas, one on each broker
-    TestUtils.createTopic(zkClient, topic, Map(partitionId -> Seq(brokerId1, 
brokerId2)), servers)
-
+    TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 
replicaAssignment =  Map(partitionId -> Seq(brokerId1, brokerId2)))
     verifyUncleanLeaderElectionEnabled()
   }
 
-  @Test
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk"))
   def testUncleanLeaderElectionDisabled(): Unit = {
     // unclean leader election is disabled by default
     startBrokers(Seq(configProps1, configProps2))
 
     // create topic with 1 partition, 2 replicas, one on each broker
-    TestUtils.createTopic(zkClient, topic, Map(partitionId -> Seq(brokerId1, 
brokerId2)), servers)
+    TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 
replicaAssignment =  Map(partitionId -> Seq(brokerId1, brokerId2)))
 
     verifyUncleanLeaderElectionDisabled()
   }
 
-  @Test
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk"))
   def testUncleanLeaderElectionEnabledByTopicOverride(): Unit = {
     // disable unclean leader election globally, but enable for our specific 
test topic
     configProps1.put("unclean.leader.election.enable", "false")
@@ -133,13 +144,14 @@ class UncleanLeaderElectionTest extends QuorumTestHarness 
{
 
     // create topic with 1 partition, 2 replicas, one on each broker, and 
unclean leader election enabled
     val topicProps = new Properties()
-    topicProps.put("unclean.leader.election.enable", "true")
-    TestUtils.createTopic(zkClient, topic, Map(partitionId -> Seq(brokerId1, 
brokerId2)), servers, topicProps)
+    topicProps.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "true")
+    TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 
replicaAssignment = Map(partitionId -> Seq(brokerId1, brokerId2)), topicConfig 
= topicProps)
 
     verifyUncleanLeaderElectionEnabled()
   }
 
-  @Test
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk"))
   def testUncleanLeaderElectionDisabledByTopicOverride(): Unit = {
     // enable unclean leader election globally, but disable for our specific 
test topic
     configProps1.put("unclean.leader.election.enable", "true")
@@ -148,58 +160,64 @@ class UncleanLeaderElectionTest extends QuorumTestHarness 
{
 
     // create topic with 1 partition, 2 replicas, one on each broker, and 
unclean leader election disabled
     val topicProps = new Properties()
-    topicProps.put("unclean.leader.election.enable", "false")
-    TestUtils.createTopic(zkClient, topic, Map(partitionId -> Seq(brokerId1, 
brokerId2)), servers, topicProps)
+    topicProps.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "false")
+    TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 
replicaAssignment = Map(partitionId -> Seq(brokerId1, brokerId2)), topicConfig 
= topicProps)
 
     verifyUncleanLeaderElectionDisabled()
   }
 
-  @Test
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk"))
   def testUncleanLeaderElectionInvalidTopicOverride(): Unit = {
     startBrokers(Seq(configProps1))
 
     // create topic with an invalid value for unclean leader election
     val topicProps = new Properties()
-    topicProps.put("unclean.leader.election.enable", "invalid")
+    topicProps.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, 
"invalid")
+
+    val e = assertThrows(classOf[ExecutionException],
+      () => TestUtils.createTopicWithAdmin(admin, topic, brokers, 
controllerServers, replicaAssignment = Map(partitionId -> Seq(brokerId1, 
brokerId2)), topicConfig = topicProps))
 
-    assertThrows(classOf[ConfigException],
-      () => TestUtils.createTopic(zkClient, topic, Map(partitionId -> 
Seq(brokerId1)), servers, topicProps))
+    assertEquals(classOf[InvalidConfigurationException], e.getCause.getClass)
   }
 
   def verifyUncleanLeaderElectionEnabled(): Unit = {
     // wait until leader is elected
-    val leaderId = waitUntilLeaderIsElectedOrChanged(zkClient, topic, 
partitionId)
-    debug("Leader for " + topic  + " is elected to be: %s".format(leaderId))
+    val leaderId = awaitLeaderChange(brokers, topicPartition)
+    debug("Leader for " + topic + " is elected to be: %s".format(leaderId))
     assertTrue(leaderId == brokerId1 || leaderId == brokerId2,
       "Leader id is set to expected value for topic: " + topic)
 
     // the non-leader broker is the follower
     val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1
-    debug("Follower for " + topic  + " is: %s".format(followerId))
+    debug("Follower for " + topic + " is: %s".format(followerId))
 
-    produceMessage(servers, topic, "first")
-    waitForPartitionMetadata(servers, topic, partitionId)
+    produceMessage(brokers, topic, "first")
+    waitForPartitionMetadata(brokers, topic, partitionId)
     assertEquals(List("first"), consumeAllMessages(topic, 1))
 
     // shutdown follower server
-    servers.filter(server => server.config.brokerId == followerId).map(server 
=> shutdownServer(server))
+    brokers.filter(broker => broker.config.brokerId == followerId).map(broker 
=> shutdownBroker(broker))
 
-    produceMessage(servers, topic, "second")
+    produceMessage(brokers, topic, "second")
     assertEquals(List("first", "second"), consumeAllMessages(topic, 2))
 
-    //remove any previous unclean election metric
-    
servers.map(_.kafkaController.controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec"))
+    //verify that unclean election metric count is 0
+    val uncleanLeaderElectionsPerSecGauge = 
getGauge("UncleanLeaderElectionsPerSec")
+    @volatile var uncleanLeaderElectionsPerSec = 
uncleanLeaderElectionsPerSecGauge.count()
+    assert(uncleanLeaderElectionsPerSec == 0)
 
     // shutdown leader and then restart follower
-    servers.filter(_.config.brokerId == leaderId).map(shutdownServer)
-    val followerServer = servers.find(_.config.brokerId == followerId).get
-    followerServer.startup()
+    brokers.filter(_.config.brokerId == leaderId).map(shutdownBroker)
+    val followerBroker = brokers.find(_.config.brokerId == followerId).get
+    followerBroker.startup()
 
     // wait until new leader is (uncleanly) elected
-    waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 
newLeaderOpt = Some(followerId))
-    assertEquals(1, 
followerServer.kafkaController.controllerContext.stats.uncleanLeaderElectionRate.count())
+    awaitLeaderChange(brokers, topicPartition, expectedLeaderOpt = 
Some(followerId), timeout = 30000)
+    uncleanLeaderElectionsPerSec = uncleanLeaderElectionsPerSecGauge.count()
+    assert(uncleanLeaderElectionsPerSec == 1)
 
-    produceMessage(servers, topic, "third")
+    produceMessage(brokers, topic, "third")
 
     // second message was lost due to unclean election
     assertEquals(List("first", "third"), consumeAllMessages(topic, 2))
@@ -207,7 +225,7 @@ class UncleanLeaderElectionTest extends QuorumTestHarness {
 
   def verifyUncleanLeaderElectionDisabled(): Unit = {
     // wait until leader is elected
-    val leaderId = waitUntilLeaderIsElectedOrChanged(zkClient, topic, 
partitionId)
+    val leaderId = awaitLeaderChange(brokers, topicPartition)
     debug("Leader for " + topic  + " is elected to be: %s".format(leaderId))
     assertTrue(leaderId == brokerId1 || leaderId == brokerId2,
       "Leader id is set to expected value for topic: " + topic)
@@ -216,60 +234,70 @@ class UncleanLeaderElectionTest extends QuorumTestHarness 
{
     val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1
     debug("Follower for " + topic  + " is: %s".format(followerId))
 
-    produceMessage(servers, topic, "first")
-    waitForPartitionMetadata(servers, topic, partitionId)
+    produceMessage(brokers, topic, "first")
+    waitForPartitionMetadata(brokers, topic, partitionId)
     assertEquals(List("first"), consumeAllMessages(topic, 1))
 
     // shutdown follower server
-    servers.filter(server => server.config.brokerId == 
followerId).foreach(server => shutdownServer(server))
+    brokers.filter(broker => broker.config.brokerId == followerId).map(broker 
=> shutdownBroker(broker))
 
-    produceMessage(servers, topic, "second")
+    produceMessage(brokers, topic, "second")
     assertEquals(List("first", "second"), consumeAllMessages(topic, 2))
 
     //remove any previous unclean election metric
-    servers.foreach(server => 
server.kafkaController.controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec"))
+    val uncleanLeaderElectionsPerSecGauge = 
getGauge("UncleanLeaderElectionsPerSec")
+    @volatile var uncleanLeaderElectionsPerSec = 
uncleanLeaderElectionsPerSecGauge.count()
+    assert(uncleanLeaderElectionsPerSec == 0)
 
     // shutdown leader and then restart follower
-    servers.filter(server => server.config.brokerId == 
leaderId).foreach(server => shutdownServer(server))
-    val followerServer = servers.find(_.config.brokerId == followerId).get
+    brokers.filter(_.config.brokerId == leaderId).map(shutdownBroker)
+    val followerServer = brokers.find(_.config.brokerId == followerId).get
     followerServer.startup()
 
     // verify that unclean election to non-ISR follower does not occur
-    waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 
newLeaderOpt = Some(-1))
-    assertEquals(0, 
followerServer.kafkaController.controllerContext.stats.uncleanLeaderElectionRate.count())
+    awaitLeaderChange(brokers, topicPartition, expectedLeaderOpt = 
Some(leaderId))
+    uncleanLeaderElectionsPerSec = uncleanLeaderElectionsPerSecGauge.count()
+    assert(uncleanLeaderElectionsPerSec == 0)
 
     // message production and consumption should both fail while leader is down
-    val e = assertThrows(classOf[ExecutionException], () => 
produceMessage(servers, topic, "third", deliveryTimeoutMs = 1000, 
requestTimeoutMs = 1000))
+    val e = assertThrows(classOf[ExecutionException], () => 
produceMessage(brokers, topic, "third", deliveryTimeoutMs = 1000, 
requestTimeoutMs = 1000))
     assertEquals(classOf[TimeoutException], e.getCause.getClass)
 
     assertEquals(List.empty[String], consumeAllMessages(topic, 0))
 
     // restart leader temporarily to send a successfully replicated message
-    servers.filter(server => server.config.brokerId == 
leaderId).foreach(server => server.startup())
-    waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 
newLeaderOpt = Some(leaderId))
+    brokers.find(_.config.brokerId == leaderId).get.startup()
+    awaitLeaderChange(brokers, topicPartition, expectedLeaderOpt = 
Some(leaderId))
 
-    produceMessage(servers, topic, "third")
+    produceMessage(brokers, topic, "third")
     //make sure follower server joins the ISR
     TestUtils.waitUntilTrue(() => {
       val partitionInfoOpt = 
followerServer.metadataCache.getPartitionInfo(topic, partitionId)
       partitionInfoOpt.isDefined && 
partitionInfoOpt.get.isr.contains(followerId)
     }, "Inconsistent metadata after first server startup")
 
-    servers.filter(server => server.config.brokerId == 
leaderId).foreach(server => shutdownServer(server))
-    // verify clean leader transition to ISR follower
-    waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 
newLeaderOpt = Some(followerId))
+    brokers.filter(_.config.brokerId == leaderId).map(shutdownBroker)
 
+    // verify clean leader transition to ISR follower
+    awaitLeaderChange(brokers, topicPartition, expectedLeaderOpt = 
Some(followerId))
     // verify messages can be consumed from ISR follower that was just 
promoted to leader
     assertEquals(List("first", "second", "third"), consumeAllMessages(topic, 
3))
   }
 
-  private def shutdownServer(server: KafkaServer): Unit = {
-    server.shutdown()
-    server.awaitShutdown()
+  private def getGauge(metricName: String) = {
+    KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
+      .find { case (k, _) => k.getName.endsWith(metricName) }
+      .getOrElse(throw new AssertionError("Unable to find metric " + 
metricName))
+      ._2.asInstanceOf[Meter]
+  }
+
+  private def shutdownBroker(broker: KafkaBroker) = {
+    broker.shutdown()
+    broker.awaitShutdown()
   }
 
   private def consumeAllMessages(topic: String, numMessages: Int): Seq[String] 
= {
-    val brokerList = TestUtils.plaintextBootstrapServers(servers)
+    val brokerList = TestUtils.plaintextBootstrapServers(brokers)
     // Don't rely on coordinator as it may be down when this method is called
     val consumer = TestUtils.createConsumer(brokerList,
       groupId = "group" + random.nextLong(),
@@ -283,42 +311,48 @@ class UncleanLeaderElectionTest extends QuorumTestHarness 
{
     } finally consumer.close()
   }
 
-  @Test
-  def testTopicUncleanLeaderElectionEnable(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk"))
+  def testTopicUncleanLeaderElectionEnableWithAlterTopicConfigs(): Unit = {
     // unclean leader election is disabled by default
     startBrokers(Seq(configProps1, configProps2))
 
     // create topic with 1 partition, 2 replicas, one on each broker
-    adminZkClient.createTopicWithAssignment(topic, config = new Properties(), 
Map(partitionId -> Seq(brokerId1, brokerId2)))
+    TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 
replicaAssignment = Map(partitionId -> Seq(brokerId1, brokerId2)))
 
     // wait until leader is elected
-    val leaderId = waitUntilLeaderIsElectedOrChanged(zkClient, topic, 
partitionId)
+    val leaderId = awaitLeaderChange(brokers, topicPartition)
 
     // the non-leader broker is the follower
     val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1
 
-    produceMessage(servers, topic, "first")
-    waitForPartitionMetadata(servers, topic, partitionId)
+    produceMessage(brokers, topic, "first")
+    waitForPartitionMetadata(brokers, topic, partitionId)
     assertEquals(List("first"), consumeAllMessages(topic, 1))
 
     // shutdown follower server
-    servers.filter(server => server.config.brokerId == followerId).map(server 
=> shutdownServer(server))
+    brokers.filter(broker => broker.config.brokerId == followerId).map(broker 
=> shutdownBroker(broker))
 
-    produceMessage(servers, topic, "second")
+    produceMessage(brokers, topic, "second")
     assertEquals(List("first", "second"), consumeAllMessages(topic, 2))
 
-    //remove any previous unclean election metric
-    servers.map(server => 
server.kafkaController.controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec"))
+    //verify that unclean election metric count is 0
+    val uncleanLeaderElectionsPerSecGauge = 
getGauge("UncleanLeaderElectionsPerSec")
+    @volatile var uncleanLeaderElectionsPerSec = 
uncleanLeaderElectionsPerSecGauge.count()
+    assert(uncleanLeaderElectionsPerSec == 0)
 
     // shutdown leader and then restart follower
-    servers.filter(server => server.config.brokerId == leaderId).map(server => 
shutdownServer(server))
-    val followerServer = servers.find(_.config.brokerId == followerId).get
-    followerServer.startup()
+    brokers.filter(_.config.brokerId == leaderId).map(shutdownBroker)
+    val followerBroker = brokers.find(_.config.brokerId == followerId).get
+    followerBroker.startup()
 
-    assertEquals(0, 
followerServer.kafkaController.controllerContext.stats.uncleanLeaderElectionRate.count())
+    // leader should not change
+    awaitLeaderChange(brokers, topicPartition, expectedLeaderOpt = 
Some(leaderId), timeout = 30000)
+    uncleanLeaderElectionsPerSec = uncleanLeaderElectionsPerSecGauge.count()
+    assert(uncleanLeaderElectionsPerSec == 0)
 
     // message production and consumption should both fail while leader is down
-    val e = assertThrows(classOf[ExecutionException], () => 
produceMessage(servers, topic, "third", deliveryTimeoutMs = 1000, 
requestTimeoutMs = 1000))
+    val e = assertThrows(classOf[ExecutionException], () => 
produceMessage(brokers, topic, "third", deliveryTimeoutMs = 1000, 
requestTimeoutMs = 1000))
     assertEquals(classOf[TimeoutException], e.getCause.getClass)
 
     assertEquals(List.empty[String], consumeAllMessages(topic, 0))
@@ -331,26 +365,26 @@ class UncleanLeaderElectionTest extends QuorumTestHarness 
{
     adminClient.close()
 
     // wait until new leader is (uncleanly) elected
-    waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 
newLeaderOpt = Some(followerId))
-    assertEquals(1, 
followerServer.kafkaController.controllerContext.stats.uncleanLeaderElectionRate.count())
+    awaitLeaderChange(brokers, topicPartition, expectedLeaderOpt = 
Some(followerId), timeout = 30000)
+    uncleanLeaderElectionsPerSec = uncleanLeaderElectionsPerSecGauge.count()
+    assert(uncleanLeaderElectionsPerSec == 1)
 
-    produceMessage(servers, topic, "third")
+    produceMessage(brokers, topic, "third")
 
     // second message was lost due to unclean election
     assertEquals(List("first", "third"), consumeAllMessages(topic, 2))
   }
 
-  @nowarn("cat=deprecation")
   private def alterTopicConfigs(adminClient: Admin, topic: String, 
topicConfigs: Properties): AlterConfigsResult = {
     val configEntries = topicConfigs.asScala.map { case (k, v) => new 
ConfigEntry(k, v) }.toList.asJava
-    val newConfig = new Config(configEntries)
-    val configs = Map(new ConfigResource(ConfigResource.Type.TOPIC, topic) -> 
newConfig).asJava
-    adminClient.alterConfigs(configs)
+    adminClient.incrementalAlterConfigs(Map(new 
ConfigResource(ConfigResource.Type.TOPIC, topic) ->
+      configEntries.asScala.map((e: ConfigEntry) => new AlterConfigOp(e, 
AlterConfigOp.OpType.SET)).toSeq
+        .asJavaCollection).asJava)
   }
 
   private def createAdminClient(): Admin = {
     val config = new Properties
-    val bootstrapServers = TestUtils.plaintextBootstrapServers(servers)
+    val bootstrapServers = TestUtils.plaintextBootstrapServers(brokers)
     config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
     config.put(AdminClientConfig.METADATA_MAX_AGE_CONFIG, "10")
     Admin.create(config)
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 2411e612d20..0a4f50ba120 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -223,7 +223,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
     // Force a leader change
     killBroker(firstLeaderId)
     // Write some more data in epoch 1
-    val secondLeaderId = TestUtils.awaitLeaderChange(brokers, topicPartition, 
firstLeaderId)
+    val secondLeaderId = TestUtils.awaitLeaderChange(brokers, topicPartition, 
oldLeaderOpt = Some(firstLeaderId))
     val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId, 
topicPartition, brokers)
     val secondEpochResponses = produceData(Seq(topicPartition), 100)
     val secondEpochEndOffset = secondEpochResponses.lastOption.get.offset + 1
@@ -285,7 +285,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
     killBroker(firstLeaderId)
 
     // Check leader error codes
-    val secondLeaderId = TestUtils.awaitLeaderChange(brokers, topicPartition, 
firstLeaderId)
+    val secondLeaderId = TestUtils.awaitLeaderChange(brokers, topicPartition, 
oldLeaderOpt = Some(firstLeaderId))
     val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId, 
topicPartition, brokers)
     assertResponseErrorForEpoch(Errors.NONE, secondLeaderId, Optional.empty())
     assertResponseErrorForEpoch(Errors.NONE, secondLeaderId, 
Optional.of(secondLeaderEpoch))
@@ -322,7 +322,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
     // -1 is treated as having no epoch at all
     killBroker(firstLeaderId)
 
-    val secondLeaderId = TestUtils.awaitLeaderChange(brokers, topicPartition, 
firstLeaderId)
+    val secondLeaderId = TestUtils.awaitLeaderChange(brokers, topicPartition, 
oldLeaderOpt = Some(firstLeaderId))
     val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId, 
topicPartition, brokers)
     verifyFetchSessionErrors(topicPartition, secondLeaderEpoch, 
secondLeaderId, version)
 
diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
index 48397a4b71d..358005cb609 100644
--- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
@@ -125,7 +125,7 @@ class ListOffsetsRequestTest extends BaseRequestTest {
     killBroker(firstLeaderId)
 
     // Check leader error codes
-    val secondLeaderId = TestUtils.awaitLeaderChange(brokers, topicPartition, 
firstLeaderId)
+    val secondLeaderId = TestUtils.awaitLeaderChange(brokers, topicPartition, 
oldLeaderOpt = Some(firstLeaderId))
     val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId, 
topicPartition, brokers)
     assertResponseErrorForEpoch(Errors.NONE, secondLeaderId, Optional.empty())
     assertResponseErrorForEpoch(Errors.NONE, secondLeaderId, 
Optional.of(secondLeaderEpoch))
@@ -198,7 +198,7 @@ class ListOffsetsRequestTest extends BaseRequestTest {
 
     // Kill the first leader so that we can verify the epoch change when 
fetching the latest offset
     killBroker(firstLeaderId)
-    val secondLeaderId = TestUtils.awaitLeaderChange(brokers, partition, 
firstLeaderId)
+    val secondLeaderId = TestUtils.awaitLeaderChange(brokers, partition, 
oldLeaderOpt = Some(firstLeaderId))
     // make sure high watermark of new leader has caught up
     TestUtils.waitUntilTrue(() => sendRequest(secondLeaderId, 
ListOffsetsRequest.LATEST_TIMESTAMP, -1).errorCode != 
Errors.OFFSET_NOT_AVAILABLE.code,
       "the second leader does not sync to follower")
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 
b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index a3bdb9a1723..21dba44f9f7 100755
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -138,7 +138,7 @@ class LogRecoveryTest extends QuorumTestHarness {
     assertEquals(hw, hwFile1.read().getOrElse(topicPartition, 0L))
 
     // check if leader moves to the other server
-    leader = awaitLeaderChange(servers, topicPartition, leader)
+    leader = awaitLeaderChange(servers, topicPartition, oldLeaderOpt = 
Some(leader))
     assertEquals(1, leader, "Leader must move to broker 1")
 
     // bring the preferred replica back
@@ -166,7 +166,7 @@ class LogRecoveryTest extends QuorumTestHarness {
 
     server2.startup()
     updateProducer()
-    leader = awaitLeaderChange(servers, topicPartition, leader)
+    leader = awaitLeaderChange(servers, topicPartition, oldLeaderOpt = 
Some(leader))
     assertTrue(leader == 0 || leader == 1,
       "Leader must remain on broker 0, in case of ZooKeeper session expiration 
it can move to broker 1")
 
@@ -221,7 +221,7 @@ class LogRecoveryTest extends QuorumTestHarness {
     server2.startup()
     updateProducer()
     // check if leader moves to the other server
-    leader = awaitLeaderChange(servers, topicPartition, leader)
+    leader = awaitLeaderChange(servers, topicPartition, oldLeaderOpt = 
Some(leader))
     assertEquals(1, leader, "Leader must move to broker 1")
 
     assertEquals(hw, hwFile1.read().getOrElse(topicPartition, 0L))
diff --git 
a/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala
index 78c10ec5ec4..29e1b70a05d 100644
--- 
a/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala
@@ -79,7 +79,7 @@ class OffsetsForLeaderEpochRequestTest extends 
BaseRequestTest {
     killBroker(firstLeaderId)
 
     // Check leader error codes
-    val secondLeaderId = TestUtils.awaitLeaderChange(brokers, topicPartition, 
firstLeaderId)
+    val secondLeaderId = TestUtils.awaitLeaderChange(brokers, topicPartition, 
oldLeaderOpt = Some(firstLeaderId))
     val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId, 
topicPartition, brokers)
     assertResponseErrorForEpoch(Errors.NONE, secondLeaderId, Optional.empty())
     assertResponseErrorForEpoch(Errors.NONE, secondLeaderId, 
Optional.of(secondLeaderEpoch))
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 35b7ce418d1..5eb3187ff30 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1099,13 +1099,31 @@ object TestUtils extends Logging {
   def awaitLeaderChange[B <: KafkaBroker](
       brokers: Seq[B],
       tp: TopicPartition,
-      oldLeader: Int,
+      oldLeaderOpt: Option[Int] = None,
+      expectedLeaderOpt: Option[Int] = None,
       timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Int = {
     def newLeaderExists: Option[Int] = {
-      brokers.find { broker =>
-        broker.config.brokerId != oldLeader &&
-          
broker.replicaManager.onlinePartition(tp).exists(_.leaderLogIfLocal.isDefined)
-      }.map(_.config.brokerId)
+      if (expectedLeaderOpt.isDefined) {
+        debug(s"Checking leader that has changed to ${expectedLeaderOpt.get}")
+        brokers.find { broker =>
+          broker.config.brokerId == expectedLeaderOpt.get &&
+            
broker.replicaManager.onlinePartition(tp).exists(_.leaderLogIfLocal.isDefined)
+        }.map(_.config.brokerId)
+
+      } else if (oldLeaderOpt.isDefined) {
+          debug(s"Checking leader that has changed from ${oldLeaderOpt}")
+          brokers.find { broker =>
+            
broker.replicaManager.onlinePartition(tp).exists(_.leaderLogIfLocal.isDefined)
+            broker.config.brokerId != oldLeaderOpt.get &&
+              
broker.replicaManager.onlinePartition(tp).exists(_.leaderLogIfLocal.isDefined)
+          }.map(_.config.brokerId)
+
+      } else {
+        debug(s"Checking the elected leader")
+        brokers.find { broker =>
+            
broker.replicaManager.onlinePartition(tp).exists(_.leaderLogIfLocal.isDefined)
+        }.map(_.config.brokerId)
+      }
     }
 
     waitUntilTrue(() => newLeaderExists.isDefined,

Reply via email to