This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 3747f55 KAFKA-7976 - Fix DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable (#6374) 3747f55 is described below commit 3747f553366a50ae123c6176cd339e078bed33c1 Author: Rajini Sivaram <rajinisiva...@googlemail.com> AuthorDate: Wed Mar 6 09:55:05 2019 +0000 KAFKA-7976 - Fix DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable (#6374) Ensure that controller is not shutdown in the test. Reviewers: Manikumar Reddy <manikumar.re...@gmail.com> --- .../kafka/server/DynamicBrokerReconfigurationTest.scala | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index c13b0a3..80ed131 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -32,7 +32,6 @@ import com.yammer.metrics.Metrics import com.yammer.metrics.core.MetricName import kafka.admin.ConfigCommand import kafka.api.{KafkaSasl, SaslSetup} -import kafka.coordinator.group.OffsetConfig import kafka.log.LogConfig import kafka.message.ProducerCompressionCodec import kafka.network.{Processor, RequestChannel} @@ -133,7 +132,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet } TestUtils.createTopic(zkClient, topic, numPartitions, replicationFactor = numServers, servers) - TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, OffsetConfig.DefaultOffsetsTopicNumPartitions, + TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, servers.head.config.offsetsTopicPartitions, replicationFactor = numServers, servers, servers.head.groupCoordinator.offsetsTopicConfigs) createAdminClient(SecurityProtocol.SSL, SecureInternal) @@ -445,8 +444,14 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet @Test def testUncleanLeaderElectionEnable(): Unit = { + val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get + val controllerId = controller.config.brokerId + + // Create a topic with two replicas on brokers other than the controller val topic = "testtopic2" - TestUtils.createTopic(zkClient, topic, 1, replicationFactor = 2, servers) + val assignment = Map(0 -> Seq((controllerId + 1) % servers.size, (controllerId + 2) % servers.size)) + TestUtils.createTopic(zkClient, topic, assignment, servers) + val producer = ProducerBuilder().acks(1).build() val consumer = ConsumerBuilder("unclean-leader-test").enableAutoCommit(false).topic(topic).build() verifyProduceConsume(producer, consumer, numRecords = 10, topic) @@ -472,7 +477,6 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet leaderBroker.shutdown() leaderBroker.awaitShutdown() followerBroker.startup() - val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get // Verify that new leader is not elected with unclean leader disabled since there are no ISRs TestUtils.waitUntilTrue(() => partitionInfo.leader == null, "Unclean leader elected") @@ -928,7 +932,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet .build() verifyProduceConsume(producer1, consumer1, numRecords = 10, topic) // send another message to check consumer later - producer1.send(new ProducerRecord(topic, "key", "value")).get(100, TimeUnit.MILLISECONDS) + producer1.send(new ProducerRecord(topic, "key", "value")).get(1, TimeUnit.SECONDS) val config = servers.head.config val existingListenerCount = config.listeners.size