[ 
https://issues.apache.org/jira/browse/KAFKA-6835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16586204#comment-16586204
 ] 

ASF GitHub Bot commented on KAFKA-6835:
---------------------------------------

junrao closed pull request #4957: KAFKA-6835: Enable topic unclean leader 
election to be enabled without controller change
URL: https://github.com/apache/kafka/pull/4957
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/controller/ControllerState.scala 
b/core/src/main/scala/kafka/controller/ControllerState.scala
index d2473058ac7..aa41c7f5458 100644
--- a/core/src/main/scala/kafka/controller/ControllerState.scala
+++ b/core/src/main/scala/kafka/controller/ControllerState.scala
@@ -94,7 +94,11 @@ object ControllerState {
     def value = 13
   }
 
+  case object TopicUncleanLeaderElectionEnable extends ControllerState {
+    def value = 14
+  }
+
   val values: Seq[ControllerState] = Seq(Idle, ControllerChange, BrokerChange, 
TopicChange, TopicDeletion,
     PartitionReassignment, AutoLeaderBalance, ManualLeaderBalance, 
ControlledShutdown, IsrChange, LeaderAndIsrResponseReceived,
-    LogDirChange, ControllerShutdown, UncleanLeaderElectionEnable)
+    LogDirChange, ControllerShutdown, UncleanLeaderElectionEnable, 
TopicUncleanLeaderElectionEnable)
 }
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index e397e80db66..f6ea43da062 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -202,6 +202,12 @@ class KafkaController(val config: KafkaConfig, zkClient: 
KafkaZkClient, time: Ti
     eventManager.put(UncleanLeaderElectionEnable)
   }
 
+  private[kafka] def enableTopicUncleanLeaderElection(topic: String): Unit = {
+    if (isActive) {
+      eventManager.put(TopicUncleanLeaderElectionEnable(topic))
+    }
+  }
+
   private def state: ControllerState = eventManager.state
 
   /**
@@ -1025,6 +1031,16 @@ class KafkaController(val config: KafkaConfig, zkClient: 
KafkaZkClient, time: Ti
     }
   }
 
+  case class TopicUncleanLeaderElectionEnable(topic: String) extends 
ControllerEvent {
+
+    def state = ControllerState.TopicUncleanLeaderElectionEnable
+
+    override def process(): Unit = {
+      if (!isActive) return
+      partitionStateMachine.triggerOnlinePartitionStateChange(topic)
+    }
+  }
+
   case class ControlledShutdown(id: Int, controlledShutdownCallback: 
Try[Set[TopicPartition]] => Unit) extends ControllerEvent {
 
     def state = ControllerState.ControlledShutdown
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala 
b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index db4c7161f35..11e38d46ffd 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -97,6 +97,14 @@ class PartitionStateMachine(config: KafkaConfig,
    * state. This is called on a successful controller election and on broker 
changes
    */
   def triggerOnlinePartitionStateChange() {
+    triggerOnlinePartitionStateChange(partitionState.toMap)
+  }
+
+  def triggerOnlinePartitionStateChange(topic: String) {
+    triggerOnlinePartitionStateChange(partitionState.filterKeys(p => 
p.topic.equals(topic)).toMap)
+  }
+
+  def triggerOnlinePartitionStateChange(partitionState: Map[TopicPartition, 
PartitionState]) {
     // try to move all partitions in NewPartition or OfflinePartition state to 
OnlinePartition state except partitions
     // that belong to topics to be deleted
     val partitionsToTrigger = partitionState.filter { case (partition, 
partitionState) =>
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala 
b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 78c3abf164a..5593225f0e6 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -21,6 +21,7 @@ import java.util.Properties
 
 import DynamicConfig.Broker._
 import kafka.api.ApiVersion
+import kafka.controller.KafkaController
 import kafka.log.{LogConfig, LogManager}
 import kafka.security.CredentialProvider
 import kafka.server.Constants._
@@ -33,6 +34,7 @@ import org.apache.kafka.common.metrics.Quota._
 import org.apache.kafka.common.utils.Sanitizer
 
 import scala.collection.JavaConverters._
+import scala.util.Try
 
 /**
   * The ConfigHandler is used to process config change notifications received 
by the DynamicConfigManager
@@ -45,7 +47,7 @@ trait ConfigHandler {
   * The TopicConfigHandler will process topic config changes in ZK.
   * The callback provides the topic name and the full properties set read from 
ZK
   */
-class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: 
KafkaConfig, val quotas: QuotaManagers) extends ConfigHandler with Logging  {
+class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: 
KafkaConfig, val quotas: QuotaManagers, kafkaController: KafkaController) 
extends ConfigHandler with Logging  {
 
   def processConfigChanges(topic: String, topicConfig: Properties) {
     // Validate the configurations.
@@ -74,6 +76,10 @@ class TopicConfigHandler(private val logManager: LogManager, 
kafkaConfig: KafkaC
     }
     updateThrottledList(LogConfig.LeaderReplicationThrottledReplicasProp, 
quotas.leader)
     updateThrottledList(LogConfig.FollowerReplicationThrottledReplicasProp, 
quotas.follower)
+
+    if 
(Try(topicConfig.getProperty(KafkaConfig.UncleanLeaderElectionEnableProp).toBoolean).getOrElse(false))
 {
+      kafkaController.enableTopicUncleanLeaderElection(topic)
+    }
   }
 
   def parseThrottledPartitions(topicConfig: Properties, brokerId: Int, prop: 
String): Seq[Int] = {
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 0ac877d3b4a..c2a49a15297 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -304,7 +304,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
         config.dynamicConfig.addReconfigurables(this)
 
         /* start dynamic config manager */
-        dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> 
new TopicConfigHandler(logManager, config, quotaManagers),
+        dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> 
new TopicConfigHandler(logManager, config, quotaManagers, kafkaController),
                                                            ConfigType.Client 
-> new ClientIdConfigHandler(quotaManagers),
                                                            ConfigType.User -> 
new UserConfigHandler(quotaManagers, credentialProvider),
                                                            ConfigType.Broker 
-> new BrokerConfigHandler(config, quotaManagers))
diff --git 
a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala 
b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index 0250cfbee82..89fcebf5b1a 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -35,6 +35,7 @@ import org.apache.kafka.common.errors.TimeoutException
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.serialization.StringDeserializer
+import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
 import org.junit.Assert._
 
 class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
@@ -238,7 +239,7 @@ class UncleanLeaderElectionTest extends 
ZooKeeperTestHarness {
 
     // message production and consumption should both fail while leader is down
     try {
-      produceMessage(servers, topic, "third")
+      produceMessage(servers, topic, "third", deliveryTimeoutMs = 1000, 
requestTimeoutMs = 1000)
       fail("Message produced while leader is down should fail, but it 
succeeded")
     } catch {
       case e: ExecutionException if e.getCause.isInstanceOf[TimeoutException] 
=> // expected
@@ -280,4 +281,73 @@ class UncleanLeaderElectionTest extends 
ZooKeeperTestHarness {
       TestUtils.consumeRecords(consumer, numMessages).map(_.value)
     } finally consumer.close()
   }
+
+  @Test
+  def testTopicUncleanLeaderElectionEnable(): Unit = {
+    // unclean leader election is disabled by default
+    startBrokers(Seq(configProps1, configProps2))
+
+    // create topic with 1 partition, 2 replicas, one on each broker
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, 
Map(partitionId -> Seq(brokerId1, brokerId2)))
+
+    // wait until leader is elected
+    val leaderId = waitUntilLeaderIsElectedOrChanged(zkClient, topic, 
partitionId)
+
+    // the non-leader broker is the follower
+    val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1
+
+    produceMessage(servers, topic, "first")
+    waitUntilMetadataIsPropagated(servers, topic, partitionId)
+    assertEquals(List("first"), consumeAllMessages(topic, 1))
+
+    // shutdown follower server
+    servers.filter(server => server.config.brokerId == followerId).map(server 
=> shutdownServer(server))
+
+    produceMessage(servers, topic, "second")
+    assertEquals(List("first", "second"), consumeAllMessages(topic, 2))
+
+    //remove any previous unclean election metric
+    servers.map(server => 
server.kafkaController.controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec"))
+
+    // shutdown leader and then restart follower
+    servers.filter(server => server.config.brokerId == leaderId).map(server => 
shutdownServer(server))
+    val followerServer = servers.find(_.config.brokerId == followerId).get
+    followerServer.startup()
+
+    assertEquals(0, 
followerServer.kafkaController.controllerContext.stats.uncleanLeaderElectionRate.count())
+
+    // message production and consumption should both fail while leader is down
+    try {
+      produceMessage(servers, topic, "third", deliveryTimeoutMs = 1000, 
requestTimeoutMs = 1000)
+      fail("Message produced while leader is down should fail, but it 
succeeded")
+    } catch {
+      case e: ExecutionException if e.getCause.isInstanceOf[TimeoutException] 
=> // expected
+    }
+
+    assertEquals(List.empty[String], consumeAllMessages(topic, 0))
+
+    // Enable unclean leader election for topic
+    val adminClient = createAdminClient()
+    val newProps = new Properties
+    newProps.put(KafkaConfig.UncleanLeaderElectionEnableProp, "true")
+    TestUtils.alterTopicConfigs(adminClient, topic, newProps).all.get
+    adminClient.close()
+
+    // wait until new leader is (uncleanly) elected
+    waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 
newLeaderOpt = Some(followerId))
+    assertEquals(1, 
followerServer.kafkaController.controllerContext.stats.uncleanLeaderElectionRate.count())
+
+    produceMessage(servers, topic, "third")
+
+    // second message was lost due to unclean election
+    assertEquals(List("first", "third"), consumeAllMessages(topic, 2))
+  }
+
+  private def createAdminClient(): AdminClient = {
+    val config = new Properties
+    val bootstrapServers = TestUtils.bootstrapServers(servers, new 
ListenerName("PLAINTEXT"))
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
+    config.put(AdminClientConfig.METADATA_MAX_AGE_CONFIG, "10")
+    AdminClient.create(config)
+  }
 }
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index f5c5c9b2802..510c4a3e273 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -233,7 +233,7 @@ class DynamicConfigChangeTest extends 
KafkaServerTestHarness {
 
   @Test
   def shouldParseReplicationQuotaProperties(): Unit = {
-    val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, 
null)
+    val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, 
null, null)
     val props: Properties = new Properties()
 
     //Given
@@ -246,7 +246,7 @@ class DynamicConfigChangeTest extends 
KafkaServerTestHarness {
 
   @Test
   def shouldParseWildcardReplicationQuotaProperties(): Unit = {
-    val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, 
null)
+    val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, 
null, null)
     val props: Properties = new Properties()
 
     //Given
@@ -261,7 +261,7 @@ class DynamicConfigChangeTest extends 
KafkaServerTestHarness {
 
   @Test
   def shouldParseReplicationQuotaReset(): Unit = {
-    val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, 
null)
+    val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, 
null, null)
     val props: Properties = new Properties()
 
     //Given
@@ -276,7 +276,7 @@ class DynamicConfigChangeTest extends 
KafkaServerTestHarness {
 
   @Test
   def shouldParseRegardlessOfWhitespaceAroundValues() {
-    val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, 
null)
+    val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, 
null, null)
     assertEquals(AllReplicas, parse(configHandler, "* "))
     assertEquals(Seq(), parse(configHandler, " "))
     assertEquals(Seq(6), parse(configHandler, "6:102"))
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 1e6f022df86..2ca3a6c986d 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -943,10 +943,15 @@ object TestUtils extends Logging {
     values
   }
 
-  def produceMessage(servers: Seq[KafkaServer], topic: String, message: 
String) {
-    val producer = 
createProducer(TestUtils.getBrokerListStrFromServers(servers))
-    producer.send(new ProducerRecord(topic, topic.getBytes, 
message.getBytes)).get
-    producer.close()
+  def produceMessage(servers: Seq[KafkaServer], topic: String, message: String,
+                     deliveryTimeoutMs: Int = 30 * 1000, requestTimeoutMs: Int 
= 20 * 1000) {
+    val producer = 
createProducer(TestUtils.getBrokerListStrFromServers(servers),
+      deliveryTimeoutMs = deliveryTimeoutMs, requestTimeoutMs = 
requestTimeoutMs)
+    try {
+      producer.send(new ProducerRecord(topic, topic.getBytes, 
message.getBytes)).get
+    } finally {
+      producer.close()
+    }
   }
 
   def verifyTopicDeletion(zkClient: KafkaZkClient, topic: String, 
numPartitions: Int, servers: Seq[KafkaServer]) {
@@ -1306,6 +1311,13 @@ object TestUtils extends Logging {
     adminClient.alterConfigs(configs)
   }
 
+  def alterTopicConfigs(adminClient: AdminClient, topic: String, topicConfigs: 
Properties): AlterConfigsResult = {
+    val configEntries = topicConfigs.asScala.map { case (k, v) => new 
ConfigEntry(k, v) }.toList.asJava
+    val newConfig = new Config(configEntries)
+    val configs = Map(new ConfigResource(ConfigResource.Type.TOPIC, topic) -> 
newConfig).asJava
+    adminClient.alterConfigs(configs)
+  }
+
   /**
    * Capture the console output during the execution of the provided function.
    */
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 979190db7b7..3b7ee853454 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -41,6 +41,10 @@ <h4><a id="upgrade_2_1_0" href="#upgrade_2_1_0">Upgrading 
from 0.8.x, 0.9.x, 0.1
         Even though the old <code>Describe Cluster</code> access is still 
supported for backward compatibility, using it for this API is not advised.</li>
 </ol>
 
+<h5><a id="upgrade_210_notable" href="#upgrade_210_notable">Notable changes in 
2.1.0</a></h5>
+<ul>
+    <li>Unclean leader election is automatically enabled by the controller 
when <code>unclean.leader.election.enable</code> config is dynamically updated 
by using per-topic config override.</li>
+</ul>
 
 <h4><a id="upgrade_2_0_0" href="#upgrade_2_0_0">Upgrading from 0.8.x, 0.9.x, 
0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, or 1.1.x to 2.0.0</a></h4>
 <p>Kafka 2.0.0 introduces wire protocol changes. By following the recommended 
rolling upgrade plan below,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable topic unclean leader election to be enabled without controller change
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-6835
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6835
>             Project: Kafka
>          Issue Type: Task
>          Components: core
>            Reporter: Rajini Sivaram
>            Assignee: Manikumar
>            Priority: Major
>             Fix For: 2.1.0
>
>
> Dynamic update of broker's default unclean.leader.election.enable will be 
> processed without controller change (KAFKA-6526). We should probably do the 
> same for topic overrides as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to