dengziming commented on code in PR #15304: URL: https://github.com/apache/kafka/pull/15304#discussion_r1475730897
########## core/src/test/scala/integration/kafka/admin/ConfigCommandIntegrationTest.scala: ########## @@ -162,19 +174,198 @@ class ConfigCommandIntegrationTest extends QuorumTestHarness with Logging { assertThrows(classOf[ConfigException], () => alterConfigWithZk(configs, None, encoderConfigs)) // Dynamic config updates using ZK should fail if broker is running. - registerBrokerInZk(brokerId.toInt) + registerBrokerInZk(zkClient, brokerId.toInt) assertThrows(classOf[IllegalArgumentException], () => alterConfigWithZk(Map("message.max.size" -> "210000"), Some(brokerId))) assertThrows(classOf[IllegalArgumentException], () => alterConfigWithZk(Map("message.max.size" -> "220000"), None)) // Dynamic config updates using ZK should for a different broker that is not running should succeed alterAndVerifyConfig(Map("message.max.size" -> "230000"), Some("2")) } - private def registerBrokerInZk(id: Int): Unit = { + private def registerBrokerInZk(zkClient: kafka.zk.KafkaZkClient, id: Int): Unit = { zkClient.createTopLevelPaths() val securityProtocol = SecurityProtocol.PLAINTEXT val endpoint = new EndPoint("localhost", 9092, ListenerName.forSecurityProtocol(securityProtocol), securityProtocol) val brokerInfo = BrokerInfo(Broker(id, Seq(endpoint), rack = None), MetadataVersion.latestTesting, jmxPort = 9192) zkClient.registerBroker(brokerInfo) } + + @ClusterTest + def testUpdateInvalidBrokersConfig(): Unit = { + checkInvalidBrokerConfig(None) + checkInvalidBrokerConfig(Some(cluster.anyBrokerSocketServer().config.brokerId.toString)) + } + + private def checkInvalidBrokerConfig(entityNameOrDefault: Option[String]): Unit = { + for (incremental <- Array(true, false)) { + val entityNameParams = entityNameOrDefault.map(name => Array("--entity-name", name)).getOrElse(Array("--entity-default")) + ConfigCommand.alterConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", s"${cluster.bootstrapServers()}", + "--alter", + "--add-config", "invalid=2", + "--entity-type", "brokers") + ++ entityNameParams + ), incremental) + + val describeResult = TestUtils.grabConsoleOutput( + ConfigCommand.describeConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", s"${cluster.bootstrapServers()}", + "--describe", + "--entity-type", "brokers") + ++ entityNameParams + ))) + // We will treat unknown config as sensitive + assertTrue(describeResult.contains("sensitive=true")) + // Sensitive config will not return + assertTrue(describeResult.contains("invalid=null")) + } + } + + @ClusterTest + def testUpdateInvalidTopicConfig(): Unit = { + TestUtils.createTopicWithAdminRaw( + admin = cluster.createAdminClient(), + topic = "test-config-topic", + ) + assertInstanceOf( + classOf[InvalidConfigurationException], + assertThrows( + classOf[ExecutionException], + () => ConfigCommand.alterConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", s"${cluster.bootstrapServers()}", + "--alter", + "--add-config", "invalid=2", + "--entity-type", "topics", + "--entity-name", "test-config-topic") + ), true)).getCause + ) + } + + @ClusterTest + def testUpdateAndDeleteBrokersConfig(): Unit = { + checkBrokerConfig(None) + checkBrokerConfig(Some(cluster.anyBrokerSocketServer().config.brokerId.toString)) + } + + private def checkBrokerConfig(entityNameOrDefault: Option[String]): Unit = { + val entityNameParams = entityNameOrDefault.map(name => Array("--entity-name", name)).getOrElse(Array("--entity-default")) + ConfigCommand.alterConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", s"${cluster.bootstrapServers()}", + "--alter", + "--add-config", "log.cleaner.threads=2", + "--entity-type", "brokers") + ++ entityNameParams + ), true) + TestUtils.waitUntilTrue( + () => cluster.brokerSocketServers().asScala.forall(broker => broker.config.getInt("log.cleaner.threads") == 2), + "Timeout waiting for topic config propagating to broker") + + val describeResult = TestUtils.grabConsoleOutput( + ConfigCommand.describeConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", s"${cluster.bootstrapServers()}", + "--describe", + "--entity-type", "brokers") + ++ entityNameParams + ))) + assertTrue(describeResult.contains("log.cleaner.threads=2")) + assertTrue(describeResult.contains("sensitive=false")) + + ConfigCommand.alterConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", s"${cluster.bootstrapServers()}", + "--alter", + "--delete-config", "log.cleaner.threads", + "--entity-type", "brokers") + ++ entityNameParams + ), true) + TestUtils.waitUntilTrue( + () => cluster.brokers().collect(Collectors.toList[KafkaBroker]).asScala.forall(broker => broker.config.getInt("log.cleaner.threads") != 2), + "Timeout waiting for topic config propagating to broker") + + assertFalse(TestUtils.grabConsoleOutput( + ConfigCommand.describeConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", s"${cluster.bootstrapServers()}", + "--describe", + "--entity-type", "brokers") + ++ entityNameParams + ))).contains("log.cleaner.threads")) + } + + @ClusterTest + def testUpdateConfigAndDeleteTopicConfig(): Unit = { + TestUtils.createTopicWithAdminRaw( + admin = cluster.createAdminClient(), + topic = "test-config-topic", + ) + ConfigCommand.alterConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", s"${cluster.bootstrapServers()}", + "--alter", + "--add-config", "segment.bytes=10240000", + "--entity-type", "topics", + "--entity-name", "test-config-topic") + ), true) + TestUtils.waitUntilTrue( + () => cluster.brokers().collect(Collectors.toList[KafkaBroker]).asScala.forall(broker => broker.logManager.logsByTopic("test-config-topic").head.config.getInt("segment.bytes") == 10240000), + "Timeout waiting for topic config propagating to broker") + + val describeResult = TestUtils.grabConsoleOutput( + ConfigCommand.describeConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", s"${cluster.bootstrapServers()}", + "--describe", + "--entity-type", "topics", + "--entity-name", "test-config-topic") + ))) + assertTrue(describeResult.contains("segment.bytes=10240000")) + assertTrue(describeResult.contains("sensitive=false")) + + ConfigCommand.alterConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", s"${cluster.bootstrapServers()}", + "--alter", + "--delete-config", "segment.bytes", + "--entity-type", "topics", + "--entity-name", "test-config-topic") + ), true) + TestUtils.waitUntilTrue( + () => cluster.brokers().collect(Collectors.toList[KafkaBroker]).asScala.forall(broker => broker.logManager.logsByTopic("test-config-topic").head.config.getInt("segment.bytes") != 10240000), + "Timeout waiting for topic config propagating to broker") + + assertFalse(TestUtils.grabConsoleOutput( + ConfigCommand.describeConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", s"${cluster.bootstrapServers()}", + "--describe", + "--entity-type", "topics", + "--entity-name", "test-config-topic") + ))).contains("segment.bytes")) + } + + @ClusterTest + def testUpdateBrokerConfigNotAffectedByInvalidConfig(): Unit = { + // Test case from KAFKA-13788 + ConfigCommand.alterConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", s"${cluster.bootstrapServers()}", + "--alter", + "--add-config", "log.cleaner.threadzz=2", + "--entity-type", "brokers", + "--entity-default") + ), true) + + ConfigCommand.alterConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", s"${cluster.bootstrapServers()}", + "--alter", + "--add-config", "log.cleaner.threads=2", + "--entity-type", "brokers", + "--entity-default") + ), true) + } + + // TODO this test doesn't make sense because we can't produce `UnsupportedVersionException` by setting inter.broker.protocol.version + @ClusterTest(clusterType=Type.ZK, metadataVersion = MetadataVersion.IBP_3_2_IV0) + def testFallbackToDeprecatedAlterConfigs(): Unit = { + ConfigCommand.alterConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", s"${cluster.bootstrapServers()}", + "--alter", + "--add-config", "log.cleaner.threads=2", + "--entity-type", "brokers", + "--entity-default") + ), true) + } Review Comment: This case doesn't make sense because we can't produce `UnsupportedVersionException` by setting inter.broker.protocol.version, so I'm waiting for help from other reviewers, and just leave it unchanged for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org