dengziming commented on code in PR #15304:
URL: https://github.com/apache/kafka/pull/15304#discussion_r1475730897


##########
core/src/test/scala/integration/kafka/admin/ConfigCommandIntegrationTest.scala:
##########
@@ -162,19 +174,198 @@ class ConfigCommandIntegrationTest extends 
QuorumTestHarness with Logging {
     assertThrows(classOf[ConfigException], () => alterConfigWithZk(configs, 
None, encoderConfigs))
 
     // Dynamic config updates using ZK should fail if broker is running.
-    registerBrokerInZk(brokerId.toInt)
+    registerBrokerInZk(zkClient, brokerId.toInt)
     assertThrows(classOf[IllegalArgumentException], () => 
alterConfigWithZk(Map("message.max.size" -> "210000"), Some(brokerId)))
     assertThrows(classOf[IllegalArgumentException], () => 
alterConfigWithZk(Map("message.max.size" -> "220000"), None))
 
     // Dynamic config updates using ZK should for a different broker that is 
not running should succeed
     alterAndVerifyConfig(Map("message.max.size" -> "230000"), Some("2"))
   }
 
-  private def registerBrokerInZk(id: Int): Unit = {
+  private def registerBrokerInZk(zkClient: kafka.zk.KafkaZkClient, id: Int): 
Unit = {
     zkClient.createTopLevelPaths()
     val securityProtocol = SecurityProtocol.PLAINTEXT
     val endpoint = new EndPoint("localhost", 9092, 
ListenerName.forSecurityProtocol(securityProtocol), securityProtocol)
     val brokerInfo = BrokerInfo(Broker(id, Seq(endpoint), rack = None), 
MetadataVersion.latestTesting, jmxPort = 9192)
     zkClient.registerBroker(brokerInfo)
   }
+
+  @ClusterTest
+  def testUpdateInvalidBrokersConfig(): Unit = {
+    checkInvalidBrokerConfig(None)
+    
checkInvalidBrokerConfig(Some(cluster.anyBrokerSocketServer().config.brokerId.toString))
+  }
+
+  private def checkInvalidBrokerConfig(entityNameOrDefault: Option[String]): 
Unit = {
+    for (incremental <- Array(true, false)) {
+      val entityNameParams = entityNameOrDefault.map(name => 
Array("--entity-name", name)).getOrElse(Array("--entity-default"))
+      ConfigCommand.alterConfig(cluster.createAdminClient(), new 
ConfigCommandOptions(
+        Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+          "--alter",
+          "--add-config", "invalid=2",
+          "--entity-type", "brokers")
+          ++ entityNameParams
+      ), incremental)
+
+      val describeResult = TestUtils.grabConsoleOutput(
+        ConfigCommand.describeConfig(cluster.createAdminClient(), new 
ConfigCommandOptions(
+          Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+            "--describe",
+            "--entity-type", "brokers")
+            ++ entityNameParams
+        )))
+      // We will treat unknown config as sensitive
+      assertTrue(describeResult.contains("sensitive=true"))
+      // Sensitive config will not return
+      assertTrue(describeResult.contains("invalid=null"))
+    }
+  }
+
+  @ClusterTest
+  def testUpdateInvalidTopicConfig(): Unit = {
+    TestUtils.createTopicWithAdminRaw(
+      admin = cluster.createAdminClient(),
+      topic = "test-config-topic",
+    )
+    assertInstanceOf(
+      classOf[InvalidConfigurationException],
+      assertThrows(
+        classOf[ExecutionException],
+        () => ConfigCommand.alterConfig(cluster.createAdminClient(), new 
ConfigCommandOptions(
+          Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+            "--alter",
+            "--add-config", "invalid=2",
+            "--entity-type", "topics",
+            "--entity-name", "test-config-topic")
+        ), true)).getCause
+    )
+  }
+
+  @ClusterTest
+  def testUpdateAndDeleteBrokersConfig(): Unit = {
+    checkBrokerConfig(None)
+    
checkBrokerConfig(Some(cluster.anyBrokerSocketServer().config.brokerId.toString))
+  }
+
+  private def checkBrokerConfig(entityNameOrDefault: Option[String]): Unit = {
+    val entityNameParams = entityNameOrDefault.map(name => 
Array("--entity-name", name)).getOrElse(Array("--entity-default"))
+    ConfigCommand.alterConfig(cluster.createAdminClient(), new 
ConfigCommandOptions(
+      Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+        "--alter",
+        "--add-config", "log.cleaner.threads=2",
+        "--entity-type", "brokers")
+        ++ entityNameParams
+    ), true)
+    TestUtils.waitUntilTrue(
+      () => cluster.brokerSocketServers().asScala.forall(broker => 
broker.config.getInt("log.cleaner.threads") == 2),
+      "Timeout waiting for topic config propagating to broker")
+
+    val describeResult = TestUtils.grabConsoleOutput(
+      ConfigCommand.describeConfig(cluster.createAdminClient(), new 
ConfigCommandOptions(
+        Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+          "--describe",
+          "--entity-type", "brokers")
+          ++ entityNameParams
+      )))
+    assertTrue(describeResult.contains("log.cleaner.threads=2"))
+    assertTrue(describeResult.contains("sensitive=false"))
+
+    ConfigCommand.alterConfig(cluster.createAdminClient(), new 
ConfigCommandOptions(
+      Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+        "--alter",
+        "--delete-config", "log.cleaner.threads",
+        "--entity-type", "brokers")
+        ++ entityNameParams
+    ), true)
+    TestUtils.waitUntilTrue(
+      () => 
cluster.brokers().collect(Collectors.toList[KafkaBroker]).asScala.forall(broker 
=> broker.config.getInt("log.cleaner.threads") != 2),
+      "Timeout waiting for topic config propagating to broker")
+
+    assertFalse(TestUtils.grabConsoleOutput(
+      ConfigCommand.describeConfig(cluster.createAdminClient(), new 
ConfigCommandOptions(
+        Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+          "--describe",
+          "--entity-type", "brokers")
+          ++ entityNameParams
+      ))).contains("log.cleaner.threads"))
+  }
+
+  @ClusterTest
+  def testUpdateConfigAndDeleteTopicConfig(): Unit = {
+    TestUtils.createTopicWithAdminRaw(
+      admin = cluster.createAdminClient(),
+      topic = "test-config-topic",
+    )
+    ConfigCommand.alterConfig(cluster.createAdminClient(), new 
ConfigCommandOptions(
+      Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+        "--alter",
+        "--add-config", "segment.bytes=10240000",
+        "--entity-type", "topics",
+        "--entity-name", "test-config-topic")
+    ), true)
+    TestUtils.waitUntilTrue(
+      () => 
cluster.brokers().collect(Collectors.toList[KafkaBroker]).asScala.forall(broker 
=> 
broker.logManager.logsByTopic("test-config-topic").head.config.getInt("segment.bytes")
 == 10240000),
+      "Timeout waiting for topic config propagating to broker")
+
+    val describeResult = TestUtils.grabConsoleOutput(
+      ConfigCommand.describeConfig(cluster.createAdminClient(), new 
ConfigCommandOptions(
+        Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+          "--describe",
+          "--entity-type", "topics",
+          "--entity-name", "test-config-topic")
+      )))
+    assertTrue(describeResult.contains("segment.bytes=10240000"))
+    assertTrue(describeResult.contains("sensitive=false"))
+
+    ConfigCommand.alterConfig(cluster.createAdminClient(), new 
ConfigCommandOptions(
+      Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+        "--alter",
+        "--delete-config", "segment.bytes",
+        "--entity-type", "topics",
+        "--entity-name", "test-config-topic")
+    ), true)
+    TestUtils.waitUntilTrue(
+      () => 
cluster.brokers().collect(Collectors.toList[KafkaBroker]).asScala.forall(broker 
=> 
broker.logManager.logsByTopic("test-config-topic").head.config.getInt("segment.bytes")
 != 10240000),
+      "Timeout waiting for topic config propagating to broker")
+
+    assertFalse(TestUtils.grabConsoleOutput(
+      ConfigCommand.describeConfig(cluster.createAdminClient(), new 
ConfigCommandOptions(
+        Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+          "--describe",
+          "--entity-type", "topics",
+          "--entity-name", "test-config-topic")
+      ))).contains("segment.bytes"))
+  }
+
+  @ClusterTest
+  def testUpdateBrokerConfigNotAffectedByInvalidConfig(): Unit = {
+    // Test case from KAFKA-13788
+    ConfigCommand.alterConfig(cluster.createAdminClient(), new 
ConfigCommandOptions(
+      Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+        "--alter",
+        "--add-config", "log.cleaner.threadzz=2",
+        "--entity-type", "brokers",
+        "--entity-default")
+    ), true)
+
+    ConfigCommand.alterConfig(cluster.createAdminClient(), new 
ConfigCommandOptions(
+      Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+        "--alter",
+        "--add-config", "log.cleaner.threads=2",
+        "--entity-type", "brokers",
+        "--entity-default")
+    ), true)
+  }
+
+  // TODO this test doesn't make sense because we can't produce 
`UnsupportedVersionException` by setting inter.broker.protocol.version
+  @ClusterTest(clusterType=Type.ZK, metadataVersion = 
MetadataVersion.IBP_3_2_IV0)
+  def testFallbackToDeprecatedAlterConfigs(): Unit = {
+    ConfigCommand.alterConfig(cluster.createAdminClient(), new 
ConfigCommandOptions(
+      Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+        "--alter",
+        "--add-config", "log.cleaner.threads=2",
+        "--entity-type", "brokers",
+        "--entity-default")
+    ), true)
+  }

Review Comment:
   This case doesn't make sense because we can't produce 
`UnsupportedVersionException` by setting inter.broker.protocol.version, so I'm 
waiting for help from other reviewers, and just leave it unchanged for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to