OmniaGM commented on code in PR #15304: URL: https://github.com/apache/kafka/pull/15304#discussion_r1474743330
########## core/src/main/scala/kafka/admin/ConfigCommand.scala: ########## @@ -405,7 +420,7 @@ object ConfigCommand extends Logging { val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false) val alterLogLevelEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, AlterConfigOp.OpType.SET)) ++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) } - ).asJavaCollection + ).asJavaCollection Review Comment: this change maybe not needed! ########## core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala: ########## @@ -1684,7 +1684,7 @@ class ConfigCommandTest extends Logging { alterResult } } - ConfigCommand.alterConfig(mockAdminClient, alterOpts) + ConfigCommand.alterConfig(mockAdminClient, alterOpts, true) verify(describeResult).all() verify(alterResult).all() } Review Comment: I can't see any test for `alterConfig` with `useIncrementalAlterConfigs` set to false. Can we add some tests for this path as well if it is possible? ########## core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala: ########## @@ -430,7 +430,7 @@ class ConfigCommandTest extends Logging { def shouldFailIfUnrecognisedEntityType(): Unit = { val createOpts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092", "--entity-name", "client", "--entity-type", "not-recognised", "--alter", "--add-config", "a=b,c=d")) - assertThrows(classOf[IllegalArgumentException], () => ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)), createOpts)) + assertThrows(classOf[IllegalArgumentException], () => ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)), createOpts, true)) Review Comment: if `useIncrementalAlterConfigs` set to true as default we wouldn't need to explicitly set it to true. ########## core/src/test/scala/integration/kafka/admin/ConfigCommandIntegrationTest.scala: ########## @@ -162,19 +174,198 @@ class ConfigCommandIntegrationTest extends QuorumTestHarness with Logging { assertThrows(classOf[ConfigException], () => alterConfigWithZk(configs, None, encoderConfigs)) // Dynamic config updates using ZK should fail if broker is running. - registerBrokerInZk(brokerId.toInt) + registerBrokerInZk(zkClient, brokerId.toInt) assertThrows(classOf[IllegalArgumentException], () => alterConfigWithZk(Map("message.max.size" -> "210000"), Some(brokerId))) assertThrows(classOf[IllegalArgumentException], () => alterConfigWithZk(Map("message.max.size" -> "220000"), None)) // Dynamic config updates using ZK should for a different broker that is not running should succeed alterAndVerifyConfig(Map("message.max.size" -> "230000"), Some("2")) } - private def registerBrokerInZk(id: Int): Unit = { + private def registerBrokerInZk(zkClient: kafka.zk.KafkaZkClient, id: Int): Unit = { zkClient.createTopLevelPaths() val securityProtocol = SecurityProtocol.PLAINTEXT val endpoint = new EndPoint("localhost", 9092, ListenerName.forSecurityProtocol(securityProtocol), securityProtocol) val brokerInfo = BrokerInfo(Broker(id, Seq(endpoint), rack = None), MetadataVersion.latestTesting, jmxPort = 9192) zkClient.registerBroker(brokerInfo) } + + @ClusterTest + def testUpdateInvalidBrokersConfig(): Unit = { + checkInvalidBrokerConfig(None) + checkInvalidBrokerConfig(Some(cluster.anyBrokerSocketServer().config.brokerId.toString)) + } + + private def checkInvalidBrokerConfig(entityNameOrDefault: Option[String]): Unit = { + for (incremental <- Array(true, false)) { + val entityNameParams = entityNameOrDefault.map(name => Array("--entity-name", name)).getOrElse(Array("--entity-default")) + ConfigCommand.alterConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", s"${cluster.bootstrapServers()}", Review Comment: There is no need to use string interpolation here as `cluster.bootstrapServers()` is already a string. ########## core/src/main/scala/kafka/admin/ConfigCommand.scala: ########## @@ -349,7 +349,7 @@ object ConfigCommand extends Logging { } @nowarn("cat=deprecation") - private[admin] def alterConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = { + private[admin] def alterConfig(adminClient: Admin, opts: ConfigCommandOptions, useIncrementalAlterConfigs: Boolean): Unit = { Review Comment: couldn't we set `useIncrementalAlterConfigs` to `true` as default specially as I can see that later we set it with true any way. ########## core/src/test/scala/integration/kafka/admin/ConfigCommandIntegrationTest.scala: ########## @@ -162,19 +174,198 @@ class ConfigCommandIntegrationTest extends QuorumTestHarness with Logging { assertThrows(classOf[ConfigException], () => alterConfigWithZk(configs, None, encoderConfigs)) // Dynamic config updates using ZK should fail if broker is running. - registerBrokerInZk(brokerId.toInt) + registerBrokerInZk(zkClient, brokerId.toInt) assertThrows(classOf[IllegalArgumentException], () => alterConfigWithZk(Map("message.max.size" -> "210000"), Some(brokerId))) assertThrows(classOf[IllegalArgumentException], () => alterConfigWithZk(Map("message.max.size" -> "220000"), None)) // Dynamic config updates using ZK should for a different broker that is not running should succeed alterAndVerifyConfig(Map("message.max.size" -> "230000"), Some("2")) } - private def registerBrokerInZk(id: Int): Unit = { + private def registerBrokerInZk(zkClient: kafka.zk.KafkaZkClient, id: Int): Unit = { zkClient.createTopLevelPaths() val securityProtocol = SecurityProtocol.PLAINTEXT val endpoint = new EndPoint("localhost", 9092, ListenerName.forSecurityProtocol(securityProtocol), securityProtocol) val brokerInfo = BrokerInfo(Broker(id, Seq(endpoint), rack = None), MetadataVersion.latestTesting, jmxPort = 9192) zkClient.registerBroker(brokerInfo) } + + @ClusterTest + def testUpdateInvalidBrokersConfig(): Unit = { + checkInvalidBrokerConfig(None) + checkInvalidBrokerConfig(Some(cluster.anyBrokerSocketServer().config.brokerId.toString)) + } + + private def checkInvalidBrokerConfig(entityNameOrDefault: Option[String]): Unit = { + for (incremental <- Array(true, false)) { + val entityNameParams = entityNameOrDefault.map(name => Array("--entity-name", name)).getOrElse(Array("--entity-default")) + ConfigCommand.alterConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", s"${cluster.bootstrapServers()}", + "--alter", + "--add-config", "invalid=2", + "--entity-type", "brokers") + ++ entityNameParams + ), incremental) + + val describeResult = TestUtils.grabConsoleOutput( + ConfigCommand.describeConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", s"${cluster.bootstrapServers()}", Review Comment: Same need to be updated for all test cases ########## core/src/main/scala/kafka/admin/ConfigCommand.scala: ########## @@ -368,14 +368,29 @@ object ConfigCommand extends Logging { if (invalidConfigs.nonEmpty) throw new InvalidConfigurationException(s"Invalid config(s): ${invalidConfigs.mkString(",")}") - val configResource = new ConfigResource(ConfigResource.Type.TOPIC, entityNameHead) + val configResourceType = entityTypeHead match { + case ConfigType.TOPIC => ConfigResource.Type.TOPIC + case ConfigType.CLIENT_METRICS =>ConfigResource.Type.CLIENT_METRICS + case ConfigType.BROKER => ConfigResource.Type.BROKER + } + val configResource = new ConfigResource(configResourceType, entityNameHead) val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false) val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, AlterConfigOp.OpType.SET)) ++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) } ).asJavaCollection - adminClient.incrementalAlterConfigs(Map(configResource -> alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS) - case ConfigType.BROKER => + var retryUsingDeprecatedAlterConfigs = false + adminClient.incrementalAlterConfigs(Map(configResource -> alterEntries).asJava, alterOptions).all().whenComplete((_, e) => { + if (e.isInstanceOf[UnsupportedVersionException] && entityTypeHead == ConfigType.BROKER) { Review Comment: Usually in scala `isInstanceOf` is kinda of anti-pattern and mostly used in tests but not in prod code. Using pattern matching may be better option here; something like the following ``` .whenComplete { case (_ , _: UnsupportedVersionException) if entityTypeHead == ConfigType.BROKER => retryUsingDeprecatedAlterConfigs = true case _ => // nothing to do } ``` ########## core/src/test/scala/integration/kafka/admin/ConfigCommandIntegrationTest.scala: ########## @@ -162,19 +174,198 @@ class ConfigCommandIntegrationTest extends QuorumTestHarness with Logging { assertThrows(classOf[ConfigException], () => alterConfigWithZk(configs, None, encoderConfigs)) // Dynamic config updates using ZK should fail if broker is running. - registerBrokerInZk(brokerId.toInt) + registerBrokerInZk(zkClient, brokerId.toInt) assertThrows(classOf[IllegalArgumentException], () => alterConfigWithZk(Map("message.max.size" -> "210000"), Some(brokerId))) assertThrows(classOf[IllegalArgumentException], () => alterConfigWithZk(Map("message.max.size" -> "220000"), None)) // Dynamic config updates using ZK should for a different broker that is not running should succeed alterAndVerifyConfig(Map("message.max.size" -> "230000"), Some("2")) } - private def registerBrokerInZk(id: Int): Unit = { + private def registerBrokerInZk(zkClient: kafka.zk.KafkaZkClient, id: Int): Unit = { zkClient.createTopLevelPaths() val securityProtocol = SecurityProtocol.PLAINTEXT val endpoint = new EndPoint("localhost", 9092, ListenerName.forSecurityProtocol(securityProtocol), securityProtocol) val brokerInfo = BrokerInfo(Broker(id, Seq(endpoint), rack = None), MetadataVersion.latestTesting, jmxPort = 9192) zkClient.registerBroker(brokerInfo) } + + @ClusterTest + def testUpdateInvalidBrokersConfig(): Unit = { + checkInvalidBrokerConfig(None) + checkInvalidBrokerConfig(Some(cluster.anyBrokerSocketServer().config.brokerId.toString)) + } + + private def checkInvalidBrokerConfig(entityNameOrDefault: Option[String]): Unit = { + for (incremental <- Array(true, false)) { + val entityNameParams = entityNameOrDefault.map(name => Array("--entity-name", name)).getOrElse(Array("--entity-default")) + ConfigCommand.alterConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", s"${cluster.bootstrapServers()}", + "--alter", + "--add-config", "invalid=2", + "--entity-type", "brokers") + ++ entityNameParams + ), incremental) + + val describeResult = TestUtils.grabConsoleOutput( + ConfigCommand.describeConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", s"${cluster.bootstrapServers()}", + "--describe", + "--entity-type", "brokers") + ++ entityNameParams + ))) + // We will treat unknown config as sensitive + assertTrue(describeResult.contains("sensitive=true")) + // Sensitive config will not return + assertTrue(describeResult.contains("invalid=null")) + } + } + + @ClusterTest + def testUpdateInvalidTopicConfig(): Unit = { + TestUtils.createTopicWithAdminRaw( + admin = cluster.createAdminClient(), + topic = "test-config-topic", + ) + assertInstanceOf( + classOf[InvalidConfigurationException], + assertThrows( + classOf[ExecutionException], + () => ConfigCommand.alterConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", s"${cluster.bootstrapServers()}", + "--alter", + "--add-config", "invalid=2", + "--entity-type", "topics", + "--entity-name", "test-config-topic") + ), true)).getCause + ) + } + + @ClusterTest + def testUpdateAndDeleteBrokersConfig(): Unit = { + checkBrokerConfig(None) + checkBrokerConfig(Some(cluster.anyBrokerSocketServer().config.brokerId.toString)) + } + + private def checkBrokerConfig(entityNameOrDefault: Option[String]): Unit = { + val entityNameParams = entityNameOrDefault.map(name => Array("--entity-name", name)).getOrElse(Array("--entity-default")) + ConfigCommand.alterConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", s"${cluster.bootstrapServers()}", + "--alter", + "--add-config", "log.cleaner.threads=2", + "--entity-type", "brokers") + ++ entityNameParams + ), true) + TestUtils.waitUntilTrue( + () => cluster.brokerSocketServers().asScala.forall(broker => broker.config.getInt("log.cleaner.threads") == 2), + "Timeout waiting for topic config propagating to broker") + + val describeResult = TestUtils.grabConsoleOutput( + ConfigCommand.describeConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", s"${cluster.bootstrapServers()}", + "--describe", + "--entity-type", "brokers") + ++ entityNameParams + ))) + assertTrue(describeResult.contains("log.cleaner.threads=2")) + assertTrue(describeResult.contains("sensitive=false")) + + ConfigCommand.alterConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", s"${cluster.bootstrapServers()}", + "--alter", + "--delete-config", "log.cleaner.threads", + "--entity-type", "brokers") + ++ entityNameParams + ), true) + TestUtils.waitUntilTrue( + () => cluster.brokers().collect(Collectors.toList[KafkaBroker]).asScala.forall(broker => broker.config.getInt("log.cleaner.threads") != 2), + "Timeout waiting for topic config propagating to broker") + + assertFalse(TestUtils.grabConsoleOutput( + ConfigCommand.describeConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", s"${cluster.bootstrapServers()}", + "--describe", + "--entity-type", "brokers") + ++ entityNameParams + ))).contains("log.cleaner.threads")) + } + + @ClusterTest + def testUpdateConfigAndDeleteTopicConfig(): Unit = { + TestUtils.createTopicWithAdminRaw( + admin = cluster.createAdminClient(), + topic = "test-config-topic", + ) + ConfigCommand.alterConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", s"${cluster.bootstrapServers()}", + "--alter", + "--add-config", "segment.bytes=10240000", + "--entity-type", "topics", + "--entity-name", "test-config-topic") + ), true) + TestUtils.waitUntilTrue( + () => cluster.brokers().collect(Collectors.toList[KafkaBroker]).asScala.forall(broker => broker.logManager.logsByTopic("test-config-topic").head.config.getInt("segment.bytes") == 10240000), + "Timeout waiting for topic config propagating to broker") + + val describeResult = TestUtils.grabConsoleOutput( + ConfigCommand.describeConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", s"${cluster.bootstrapServers()}", + "--describe", + "--entity-type", "topics", + "--entity-name", "test-config-topic") + ))) + assertTrue(describeResult.contains("segment.bytes=10240000")) + assertTrue(describeResult.contains("sensitive=false")) + + ConfigCommand.alterConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", s"${cluster.bootstrapServers()}", + "--alter", + "--delete-config", "segment.bytes", + "--entity-type", "topics", + "--entity-name", "test-config-topic") + ), true) + TestUtils.waitUntilTrue( + () => cluster.brokers().collect(Collectors.toList[KafkaBroker]).asScala.forall(broker => broker.logManager.logsByTopic("test-config-topic").head.config.getInt("segment.bytes") != 10240000), + "Timeout waiting for topic config propagating to broker") + + assertFalse(TestUtils.grabConsoleOutput( + ConfigCommand.describeConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", s"${cluster.bootstrapServers()}", + "--describe", + "--entity-type", "topics", + "--entity-name", "test-config-topic") + ))).contains("segment.bytes")) + } + + @ClusterTest + def testUpdateBrokerConfigNotAffectedByInvalidConfig(): Unit = { + // Test case from KAFKA-13788 + ConfigCommand.alterConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", s"${cluster.bootstrapServers()}", + "--alter", + "--add-config", "log.cleaner.threadzz=2", + "--entity-type", "brokers", + "--entity-default") + ), true) + + ConfigCommand.alterConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", s"${cluster.bootstrapServers()}", + "--alter", + "--add-config", "log.cleaner.threads=2", + "--entity-type", "brokers", + "--entity-default") + ), true) + } + + // TODO this test doesn't make sense because we can't produce `UnsupportedVersionException` by setting inter.broker.protocol.version + @ClusterTest(clusterType=Type.ZK, metadataVersion = MetadataVersion.IBP_3_2_IV0) + def testFallbackToDeprecatedAlterConfigs(): Unit = { + ConfigCommand.alterConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", s"${cluster.bootstrapServers()}", + "--alter", + "--add-config", "log.cleaner.threads=2", + "--entity-type", "brokers", + "--entity-default") + ), true) + } Review Comment: I can't see any tests for `useIncrementalAlterConfigs` set to false. Can we test this path as well if possible? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org