This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch 2.5 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push: new a947720 KAFKA-10004: ConfigCommand fails to find default broker configs without ZK (#8675) a947720 is described below commit a947720a7ecd5b1e1b8350472a3ccfa02959a939 Author: showuon <43372967+show...@users.noreply.github.com> AuthorDate: Tue May 19 09:43:45 2020 +0800 KAFKA-10004: ConfigCommand fails to find default broker configs without ZK (#8675) Reviewers: Brian Byrne <bby...@confluent.io>, Colin P. McCabe <cmcc...@apache.org> (cherry picked from commit ad0850659f5d536d43f09221c941022fc273e6d5) --- .../src/main/scala/kafka/admin/ConfigCommand.scala | 7 ++-- .../scala/unit/kafka/admin/ConfigCommandTest.scala | 48 ++++++++++++++++++---- 2 files changed, 45 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 562a91a..5291a64 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -60,6 +60,7 @@ import scala.collection._ */ object ConfigCommand extends Config { + val BrokerDefaultEntityName = "" val BrokerLoggerConfigType = "broker-loggers" val BrokerSupportedConfigTypes = Seq(ConfigType.Topic, ConfigType.Broker, BrokerLoggerConfigType) val DefaultScramIterations = 4096 @@ -378,12 +379,12 @@ object ConfigCommand extends Config { case ConfigType.Topic => adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names().get().asScala.toSeq case ConfigType.Broker | BrokerLoggerConfigType => - adminClient.describeCluster(new DescribeClusterOptions()).nodes().get().asScala.map(_.idString).toSeq :+ ConfigEntityName.Default + adminClient.describeCluster(new DescribeClusterOptions()).nodes().get().asScala.map(_.idString).toSeq :+ BrokerDefaultEntityName }) entities.foreach { entity => entity match { - case "" => + case BrokerDefaultEntityName => println(s"Default configs for $entityType in the cluster are:") case _ => val configSourceStr = if (describeAll) "All" else "Dynamic" @@ -408,7 +409,7 @@ object ConfigCommand extends Config { Topic.validate(entityName) (ConfigResource.Type.TOPIC, Some(ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG)) case ConfigType.Broker => entityName match { - case "" => + case BrokerDefaultEntityName => (ConfigResource.Type.BROKER, Some(ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG)) case _ => validateBrokerId() diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala index e938a6d..b57238d 100644 --- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala @@ -439,12 +439,6 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { } @Test - def shouldAddBrokerDynamicConfig(): Unit = { - val node = new Node(1, "localhost", 9092) - verifyAlterBrokerConfig(node, "1", List("--entity-name", "1")) - } - - @Test def shouldAddBrokerLoggerConfig(): Unit = { val node = new Node(1, "localhost", 9092) verifyAlterBrokerLoggerConfig(node, "1", "1", List( @@ -456,7 +450,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { @Test def testNoSpecifiedEntityOptionWithDescribeBrokersInZKIsAllowed(): Unit = { - val optsList = List("--zookeeper", "localhost:9092", + val optsList = List("--zookeeper", zkConnect, "--entity-type", ConfigType.Broker, "--describe" ) @@ -546,6 +540,12 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { verifyAlterBrokerConfig(node, "", List("--entity-default")) } + @Test + def shouldAddBrokerDynamicConfig(): Unit = { + val node = new Node(1, "localhost", 9092) + verifyAlterBrokerConfig(node, "1", List("--entity-name", "1")) + } + def verifyAlterBrokerConfig(node: Node, resourceName: String, resourceOpts: List[String]): Unit = { val optsList = List("--bootstrap-server", "localhost:9092", "--entity-type", "brokers", @@ -592,6 +592,40 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { EasyMock.reset(alterResult, describeResult) } + @Test + def shouldDescribeConfigBrokerWithoutEntityName(): Unit = { + val describeOpts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092", + "--entity-type", "brokers", + "--describe")) + + val BrokerDefaultEntityName = "" + val resourceCustom = new ConfigResource(ConfigResource.Type.BROKER, "1") + val resourceDefault = new ConfigResource(ConfigResource.Type.BROKER, BrokerDefaultEntityName) + val future = new KafkaFutureImpl[util.Map[ConfigResource, Config]] + val emptyConfig = new Config(util.Collections.emptyList[ConfigEntry]) + val resultMap = Map(resourceCustom -> emptyConfig, resourceDefault -> emptyConfig).asJava + future.complete(resultMap) + val describeResult: DescribeConfigsResult = EasyMock.createNiceMock(classOf[DescribeConfigsResult]) + // make sure it will be called 2 times: (1) for broker "1" (2) for default broker "" + EasyMock.expect(describeResult.all()).andReturn(future).times(2) + + val node = new Node(1, "localhost", 9092) + val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) { + override def describeConfigs(resources: util.Collection[ConfigResource], options: DescribeConfigsOptions): DescribeConfigsResult = { + assertTrue("Synonyms not requested", options.includeSynonyms()) + val resource = resources.iterator.next + assertEquals(ConfigResource.Type.BROKER, resource.`type`) + assertTrue(resourceCustom.name == resource.name || resourceDefault.name == resource.name) + assertEquals(1, resources.size) + describeResult + } + } + EasyMock.replay(describeResult) + ConfigCommand.describeConfig(mockAdminClient, describeOpts) + EasyMock.verify(describeResult) + EasyMock.reset(describeResult) + } + def verifyAlterBrokerLoggerConfig(node: Node, resourceName: String, entityName: String, describeConfigEntries: List[ConfigEntry]): Unit = { val optsList = List("--bootstrap-server", "localhost:9092",