[ https://issues.apache.org/jira/browse/KAFKA-6835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16586204#comment-16586204 ]
ASF GitHub Bot commented on KAFKA-6835: --------------------------------------- junrao closed pull request #4957: KAFKA-6835: Enable topic unclean leader election to be enabled without controller change URL: https://github.com/apache/kafka/pull/4957 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/controller/ControllerState.scala b/core/src/main/scala/kafka/controller/ControllerState.scala index d2473058ac7..aa41c7f5458 100644 --- a/core/src/main/scala/kafka/controller/ControllerState.scala +++ b/core/src/main/scala/kafka/controller/ControllerState.scala @@ -94,7 +94,11 @@ object ControllerState { def value = 13 } + case object TopicUncleanLeaderElectionEnable extends ControllerState { + def value = 14 + } + val values: Seq[ControllerState] = Seq(Idle, ControllerChange, BrokerChange, TopicChange, TopicDeletion, PartitionReassignment, AutoLeaderBalance, ManualLeaderBalance, ControlledShutdown, IsrChange, LeaderAndIsrResponseReceived, - LogDirChange, ControllerShutdown, UncleanLeaderElectionEnable) + LogDirChange, ControllerShutdown, UncleanLeaderElectionEnable, TopicUncleanLeaderElectionEnable) } diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index e397e80db66..f6ea43da062 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -202,6 +202,12 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti eventManager.put(UncleanLeaderElectionEnable) } + private[kafka] def enableTopicUncleanLeaderElection(topic: String): Unit = { + if (isActive) { + eventManager.put(TopicUncleanLeaderElectionEnable(topic)) + } + } + private def state: ControllerState = eventManager.state /** @@ -1025,6 +1031,16 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti } } + case class TopicUncleanLeaderElectionEnable(topic: String) extends ControllerEvent { + + def state = ControllerState.TopicUncleanLeaderElectionEnable + + override def process(): Unit = { + if (!isActive) return + partitionStateMachine.triggerOnlinePartitionStateChange(topic) + } + } + case class ControlledShutdown(id: Int, controlledShutdownCallback: Try[Set[TopicPartition]] => Unit) extends ControllerEvent { def state = ControllerState.ControlledShutdown diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index db4c7161f35..11e38d46ffd 100755 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -97,6 +97,14 @@ class PartitionStateMachine(config: KafkaConfig, * state. This is called on a successful controller election and on broker changes */ def triggerOnlinePartitionStateChange() { + triggerOnlinePartitionStateChange(partitionState.toMap) + } + + def triggerOnlinePartitionStateChange(topic: String) { + triggerOnlinePartitionStateChange(partitionState.filterKeys(p => p.topic.equals(topic)).toMap) + } + + def triggerOnlinePartitionStateChange(partitionState: Map[TopicPartition, PartitionState]) { // try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition state except partitions // that belong to topics to be deleted val partitionsToTrigger = partitionState.filter { case (partition, partitionState) => diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala index 78c3abf164a..5593225f0e6 100644 --- a/core/src/main/scala/kafka/server/ConfigHandler.scala +++ b/core/src/main/scala/kafka/server/ConfigHandler.scala @@ -21,6 +21,7 @@ import java.util.Properties import DynamicConfig.Broker._ import kafka.api.ApiVersion +import kafka.controller.KafkaController import kafka.log.{LogConfig, LogManager} import kafka.security.CredentialProvider import kafka.server.Constants._ @@ -33,6 +34,7 @@ import org.apache.kafka.common.metrics.Quota._ import org.apache.kafka.common.utils.Sanitizer import scala.collection.JavaConverters._ +import scala.util.Try /** * The ConfigHandler is used to process config change notifications received by the DynamicConfigManager @@ -45,7 +47,7 @@ trait ConfigHandler { * The TopicConfigHandler will process topic config changes in ZK. * The callback provides the topic name and the full properties set read from ZK */ -class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaConfig, val quotas: QuotaManagers) extends ConfigHandler with Logging { +class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaConfig, val quotas: QuotaManagers, kafkaController: KafkaController) extends ConfigHandler with Logging { def processConfigChanges(topic: String, topicConfig: Properties) { // Validate the configurations. @@ -74,6 +76,10 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaC } updateThrottledList(LogConfig.LeaderReplicationThrottledReplicasProp, quotas.leader) updateThrottledList(LogConfig.FollowerReplicationThrottledReplicasProp, quotas.follower) + + if (Try(topicConfig.getProperty(KafkaConfig.UncleanLeaderElectionEnableProp).toBoolean).getOrElse(false)) { + kafkaController.enableTopicUncleanLeaderElection(topic) + } } def parseThrottledPartitions(topicConfig: Properties, brokerId: Int, prop: String): Seq[Int] = { diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 0ac877d3b4a..c2a49a15297 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -304,7 +304,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP config.dynamicConfig.addReconfigurables(this) /* start dynamic config manager */ - dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers), + dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers, kafkaController), ConfigType.Client -> new ClientIdConfigHandler(quotaManagers), ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider), ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers)) diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala index 0250cfbee82..89fcebf5b1a 100755 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -35,6 +35,7 @@ import org.apache.kafka.common.errors.TimeoutException import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig} import org.junit.Assert._ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { @@ -238,7 +239,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { // message production and consumption should both fail while leader is down try { - produceMessage(servers, topic, "third") + produceMessage(servers, topic, "third", deliveryTimeoutMs = 1000, requestTimeoutMs = 1000) fail("Message produced while leader is down should fail, but it succeeded") } catch { case e: ExecutionException if e.getCause.isInstanceOf[TimeoutException] => // expected @@ -280,4 +281,73 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { TestUtils.consumeRecords(consumer, numMessages).map(_.value) } finally consumer.close() } + + @Test + def testTopicUncleanLeaderElectionEnable(): Unit = { + // unclean leader election is disabled by default + startBrokers(Seq(configProps1, configProps2)) + + // create topic with 1 partition, 2 replicas, one on each broker + adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(partitionId -> Seq(brokerId1, brokerId2))) + + // wait until leader is elected + val leaderId = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId) + + // the non-leader broker is the follower + val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1 + + produceMessage(servers, topic, "first") + waitUntilMetadataIsPropagated(servers, topic, partitionId) + assertEquals(List("first"), consumeAllMessages(topic, 1)) + + // shutdown follower server + servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server)) + + produceMessage(servers, topic, "second") + assertEquals(List("first", "second"), consumeAllMessages(topic, 2)) + + //remove any previous unclean election metric + servers.map(server => server.kafkaController.controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec")) + + // shutdown leader and then restart follower + servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server)) + val followerServer = servers.find(_.config.brokerId == followerId).get + followerServer.startup() + + assertEquals(0, followerServer.kafkaController.controllerContext.stats.uncleanLeaderElectionRate.count()) + + // message production and consumption should both fail while leader is down + try { + produceMessage(servers, topic, "third", deliveryTimeoutMs = 1000, requestTimeoutMs = 1000) + fail("Message produced while leader is down should fail, but it succeeded") + } catch { + case e: ExecutionException if e.getCause.isInstanceOf[TimeoutException] => // expected + } + + assertEquals(List.empty[String], consumeAllMessages(topic, 0)) + + // Enable unclean leader election for topic + val adminClient = createAdminClient() + val newProps = new Properties + newProps.put(KafkaConfig.UncleanLeaderElectionEnableProp, "true") + TestUtils.alterTopicConfigs(adminClient, topic, newProps).all.get + adminClient.close() + + // wait until new leader is (uncleanly) elected + waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(followerId)) + assertEquals(1, followerServer.kafkaController.controllerContext.stats.uncleanLeaderElectionRate.count()) + + produceMessage(servers, topic, "third") + + // second message was lost due to unclean election + assertEquals(List("first", "third"), consumeAllMessages(topic, 2)) + } + + private def createAdminClient(): AdminClient = { + val config = new Properties + val bootstrapServers = TestUtils.bootstrapServers(servers, new ListenerName("PLAINTEXT")) + config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) + config.put(AdminClientConfig.METADATA_MAX_AGE_CONFIG, "10") + AdminClient.create(config) + } } diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala index f5c5c9b2802..510c4a3e273 100644 --- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala @@ -233,7 +233,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { @Test def shouldParseReplicationQuotaProperties(): Unit = { - val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, null) + val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, null, null) val props: Properties = new Properties() //Given @@ -246,7 +246,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { @Test def shouldParseWildcardReplicationQuotaProperties(): Unit = { - val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, null) + val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, null, null) val props: Properties = new Properties() //Given @@ -261,7 +261,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { @Test def shouldParseReplicationQuotaReset(): Unit = { - val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, null) + val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, null, null) val props: Properties = new Properties() //Given @@ -276,7 +276,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { @Test def shouldParseRegardlessOfWhitespaceAroundValues() { - val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, null) + val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, null, null) assertEquals(AllReplicas, parse(configHandler, "* ")) assertEquals(Seq(), parse(configHandler, " ")) assertEquals(Seq(6), parse(configHandler, "6:102")) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 1e6f022df86..2ca3a6c986d 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -943,10 +943,15 @@ object TestUtils extends Logging { values } - def produceMessage(servers: Seq[KafkaServer], topic: String, message: String) { - val producer = createProducer(TestUtils.getBrokerListStrFromServers(servers)) - producer.send(new ProducerRecord(topic, topic.getBytes, message.getBytes)).get - producer.close() + def produceMessage(servers: Seq[KafkaServer], topic: String, message: String, + deliveryTimeoutMs: Int = 30 * 1000, requestTimeoutMs: Int = 20 * 1000) { + val producer = createProducer(TestUtils.getBrokerListStrFromServers(servers), + deliveryTimeoutMs = deliveryTimeoutMs, requestTimeoutMs = requestTimeoutMs) + try { + producer.send(new ProducerRecord(topic, topic.getBytes, message.getBytes)).get + } finally { + producer.close() + } } def verifyTopicDeletion(zkClient: KafkaZkClient, topic: String, numPartitions: Int, servers: Seq[KafkaServer]) { @@ -1306,6 +1311,13 @@ object TestUtils extends Logging { adminClient.alterConfigs(configs) } + def alterTopicConfigs(adminClient: AdminClient, topic: String, topicConfigs: Properties): AlterConfigsResult = { + val configEntries = topicConfigs.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava + val newConfig = new Config(configEntries) + val configs = Map(new ConfigResource(ConfigResource.Type.TOPIC, topic) -> newConfig).asJava + adminClient.alterConfigs(configs) + } + /** * Capture the console output during the execution of the provided function. */ diff --git a/docs/upgrade.html b/docs/upgrade.html index 979190db7b7..3b7ee853454 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -41,6 +41,10 @@ <h4><a id="upgrade_2_1_0" href="#upgrade_2_1_0">Upgrading from 0.8.x, 0.9.x, 0.1 Even though the old <code>Describe Cluster</code> access is still supported for backward compatibility, using it for this API is not advised.</li> </ol> +<h5><a id="upgrade_210_notable" href="#upgrade_210_notable">Notable changes in 2.1.0</a></h5> +<ul> + <li>Unclean leader election is automatically enabled by the controller when <code>unclean.leader.election.enable</code> config is dynamically updated by using per-topic config override.</li> +</ul> <h4><a id="upgrade_2_0_0" href="#upgrade_2_0_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, or 1.1.x to 2.0.0</a></h4> <p>Kafka 2.0.0 introduces wire protocol changes. By following the recommended rolling upgrade plan below, ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enable topic unclean leader election to be enabled without controller change > ---------------------------------------------------------------------------- > > Key: KAFKA-6835 > URL: https://issues.apache.org/jira/browse/KAFKA-6835 > Project: Kafka > Issue Type: Task > Components: core > Reporter: Rajini Sivaram > Assignee: Manikumar > Priority: Major > Fix For: 2.1.0 > > > Dynamic update of broker's default unclean.leader.election.enable will be > processed without controller change (KAFKA-6526). We should probably do the > same for topic overrides as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)