This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 16d4d8c MINOR: Fix flaky ConsumerTopicCreationTest (#6727) 16d4d8c is described below commit 16d4d8cafc7394a35caf3354b449505bde56920f Author: Dhruvil Shah <dhru...@confluent.io> AuthorDate: Tue May 14 15:14:49 2019 -0700 MINOR: Fix flaky ConsumerTopicCreationTest (#6727) `ConsumerTopicCreationTest` relied on `KafkaConsumer#poll` to send a `MetadataRequest` within 100ms to verify if a topic is auto created or not. This is brittle and does not guarantee if the request made it to the broker or was processed successfully. This PR fixes the flaky test by adding another topic; we wait until we consume a previously produced record to this topic. This ensures MetadataRequest was processed and we could then check if the topic we're interested in was created or not. Reviewers: Boyang Chen <bche...@outlook.com>, Jason Gustafson <ja...@confluent.io> --- .../kafka/api/ConsumerTopicCreationTest.scala | 64 ++++++++++------------ .../kafka/api/IntegrationTestHarness.scala | 19 +++++++ 2 files changed, 47 insertions(+), 36 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala index 11fbefd..c145b24 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala @@ -17,23 +17,22 @@ package integration.kafka.api -import org.junit.runner.RunWith -import org.junit.runners.Parameterized -import org.junit.runners.Parameterized.Parameters import java.lang.{Boolean => JBoolean} import java.time.Duration import java.util +import java.util.Collections -import scala.collection.JavaConverters._ import kafka.api.IntegrationTestHarness import kafka.server.KafkaConfig import kafka.utils.TestUtils -import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig} +import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.clients.producer.ProducerConfig -import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.utils.Utils -import org.junit.{After, Test} +import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} +import org.junit.Assert._ +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.junit.runners.Parameterized.Parameters /** * Tests behavior of specifying auto topic creation configuration for the consumer and broker @@ -42,12 +41,10 @@ import org.junit.{After, Test} class ConsumerTopicCreationTest(brokerAutoTopicCreationEnable: JBoolean, consumerAllowAutoCreateTopics: JBoolean) extends IntegrationTestHarness { override protected def brokerCount: Int = 1 - val topic = "topic" - val part = 0 - val tp = new TopicPartition(topic, part) + val topic_1 = "topic-1" + val topic_2 = "topic-2" val producerClientId = "ConsumerTestProducer" val consumerClientId = "ConsumerTestConsumer" - var adminClient: AdminClient = null // configure server properties this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown @@ -62,36 +59,31 @@ class ConsumerTopicCreationTest(brokerAutoTopicCreationEnable: JBoolean, consume this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "100") this.consumerConfig.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, consumerAllowAutoCreateTopics.toString) - @After - override def tearDown(): Unit = { - if (adminClient != null) - Utils.closeQuietly(adminClient, "AdminClient") - super.tearDown() - } - @Test def testAutoTopicCreation(): Unit = { val consumer = createConsumer() - adminClient = AdminClient.create(createConfig()) + val producer = createProducer() + val adminClient = createAdminClient() + val record = new ProducerRecord(topic_1, 0, "key".getBytes, "value".getBytes) + + // create `topic_1` and produce a record to it + adminClient.createTopics(Collections.singleton(new NewTopic(topic_1, 1, 1))).all.get + producer.send(record).get - consumer.subscribe(util.Arrays.asList(topic)) - consumer.poll(Duration.ofMillis(100)) + consumer.subscribe(util.Arrays.asList(topic_1, topic_2)) - val topicCreated = adminClient.listTopics.names.get.contains(topic) + // Wait until the produced record was consumed. This guarantees that metadata request for `topic_2` was sent to the + // broker. + TestUtils.waitUntilTrue(() => { + consumer.poll(Duration.ofMillis(100)).count > 0 + }, "Timed out waiting to consume") + + // MetadataRequest is guaranteed to create the topic znode if creation was required + val topicCreated = zkClient.getAllTopicsInCluster.contains(topic_2) if (brokerAutoTopicCreationEnable && consumerAllowAutoCreateTopics) - assert(topicCreated == true) + assertTrue(topicCreated) else - assert(topicCreated == false) - } - - def createConfig(): util.Map[String, Object] = { - val config = new util.HashMap[String, Object] - config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) - config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "20000") - val securityProps: util.Map[Object, Object] = - TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties) - securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) } - config + assertFalse(topicCreated) } } diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index 5ffbc43..242a305 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -27,6 +27,7 @@ import java.util.Properties import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig} import kafka.server.KafkaConfig import kafka.integration.KafkaServerTestHarness +import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig} import org.apache.kafka.common.network.{ListenerName, Mode} import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, Serializer} import org.junit.{After, Before} @@ -42,10 +43,12 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { val producerConfig = new Properties val consumerConfig = new Properties + val adminClientConfig = new Properties val serverConfig = new Properties private val consumers = mutable.Buffer[KafkaConsumer[_, _]]() private val producers = mutable.Buffer[KafkaProducer[_, _]]() + private val adminClients = mutable.Buffer[AdminClient]() protected def interBrokerListenerName: ListenerName = listenerName @@ -84,6 +87,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { // Generate client security properties before starting the brokers in case certs are needed producerConfig ++= clientSecurityProps("producer") consumerConfig ++= clientSecurityProps("consumer") + adminClientConfig ++= clientSecurityProps("adminClient") super.setUp() @@ -98,6 +102,8 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { consumerConfig.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName) consumerConfig.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName) + adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + if (createOffsetsTopic) TestUtils.createOffsetsTopic(zkClient, servers) } @@ -131,13 +137,26 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { consumer } + def createAdminClient(configOverrides: Properties = new Properties): AdminClient = { + val props = new Properties + props ++= adminClientConfig + props ++= configOverrides + val adminClient = AdminClient.create(props) + adminClients += adminClient + adminClient + } + @After override def tearDown() { producers.foreach(_.close(Duration.ZERO)) consumers.foreach(_.wakeup()) consumers.foreach(_.close(Duration.ZERO)) + adminClients.foreach(_.close(Duration.ZERO)) + producers.clear() consumers.clear() + adminClients.clear() + super.tearDown() }