OmniaGM commented on code in PR #15304:
URL: https://github.com/apache/kafka/pull/15304#discussion_r1474743330


##########
core/src/main/scala/kafka/admin/ConfigCommand.scala:
##########
@@ -405,7 +420,7 @@ object ConfigCommand extends Logging {
         val alterOptions = new 
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
         val alterLogLevelEntries = (configsToBeAdded.values.map(new 
AlterConfigOp(_, AlterConfigOp.OpType.SET))
           ++ configsToBeDeleted.map { k => new AlterConfigOp(new 
ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
-        ).asJavaCollection
+          ).asJavaCollection

Review Comment:
   this change maybe not needed!



##########
core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala:
##########
@@ -1684,7 +1684,7 @@ class ConfigCommandTest extends Logging {
         alterResult
       }
     }
-    ConfigCommand.alterConfig(mockAdminClient, alterOpts)
+    ConfigCommand.alterConfig(mockAdminClient, alterOpts, true)
     verify(describeResult).all()
     verify(alterResult).all()
   }

Review Comment:
   I can't see any test for `alterConfig` with `useIncrementalAlterConfigs` set 
to false. Can we add some tests for this path as well if it is possible?



##########
core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala:
##########
@@ -430,7 +430,7 @@ class ConfigCommandTest extends Logging {
   def shouldFailIfUnrecognisedEntityType(): Unit = {
     val createOpts = new ConfigCommandOptions(Array("--bootstrap-server", 
"localhost:9092",
       "--entity-name", "client", "--entity-type", "not-recognised", "--alter", 
"--add-config", "a=b,c=d"))
-    assertThrows(classOf[IllegalArgumentException], () => 
ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)), 
createOpts))
+    assertThrows(classOf[IllegalArgumentException], () => 
ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)), 
createOpts, true))

Review Comment:
   if `useIncrementalAlterConfigs` set to true as default we wouldn't need to 
explicitly set it to true. 



##########
core/src/test/scala/integration/kafka/admin/ConfigCommandIntegrationTest.scala:
##########
@@ -162,19 +174,198 @@ class ConfigCommandIntegrationTest extends 
QuorumTestHarness with Logging {
     assertThrows(classOf[ConfigException], () => alterConfigWithZk(configs, 
None, encoderConfigs))
 
     // Dynamic config updates using ZK should fail if broker is running.
-    registerBrokerInZk(brokerId.toInt)
+    registerBrokerInZk(zkClient, brokerId.toInt)
     assertThrows(classOf[IllegalArgumentException], () => 
alterConfigWithZk(Map("message.max.size" -> "210000"), Some(brokerId)))
     assertThrows(classOf[IllegalArgumentException], () => 
alterConfigWithZk(Map("message.max.size" -> "220000"), None))
 
     // Dynamic config updates using ZK should for a different broker that is 
not running should succeed
     alterAndVerifyConfig(Map("message.max.size" -> "230000"), Some("2"))
   }
 
-  private def registerBrokerInZk(id: Int): Unit = {
+  private def registerBrokerInZk(zkClient: kafka.zk.KafkaZkClient, id: Int): 
Unit = {
     zkClient.createTopLevelPaths()
     val securityProtocol = SecurityProtocol.PLAINTEXT
     val endpoint = new EndPoint("localhost", 9092, 
ListenerName.forSecurityProtocol(securityProtocol), securityProtocol)
     val brokerInfo = BrokerInfo(Broker(id, Seq(endpoint), rack = None), 
MetadataVersion.latestTesting, jmxPort = 9192)
     zkClient.registerBroker(brokerInfo)
   }
+
+  @ClusterTest
+  def testUpdateInvalidBrokersConfig(): Unit = {
+    checkInvalidBrokerConfig(None)
+    
checkInvalidBrokerConfig(Some(cluster.anyBrokerSocketServer().config.brokerId.toString))
+  }
+
+  private def checkInvalidBrokerConfig(entityNameOrDefault: Option[String]): 
Unit = {
+    for (incremental <- Array(true, false)) {
+      val entityNameParams = entityNameOrDefault.map(name => 
Array("--entity-name", name)).getOrElse(Array("--entity-default"))
+      ConfigCommand.alterConfig(cluster.createAdminClient(), new 
ConfigCommandOptions(
+        Array("--bootstrap-server", s"${cluster.bootstrapServers()}",

Review Comment:
   There is no need to use string interpolation here as 
`cluster.bootstrapServers()` is already a string.



##########
core/src/main/scala/kafka/admin/ConfigCommand.scala:
##########
@@ -349,7 +349,7 @@ object ConfigCommand extends Logging {
   }
 
   @nowarn("cat=deprecation")
-  private[admin] def alterConfig(adminClient: Admin, opts: 
ConfigCommandOptions): Unit = {
+  private[admin] def alterConfig(adminClient: Admin, opts: 
ConfigCommandOptions, useIncrementalAlterConfigs: Boolean): Unit = {

Review Comment:
   couldn't we set `useIncrementalAlterConfigs` to `true` as default specially 
as I can see that later we set it with true any way. 



##########
core/src/test/scala/integration/kafka/admin/ConfigCommandIntegrationTest.scala:
##########
@@ -162,19 +174,198 @@ class ConfigCommandIntegrationTest extends 
QuorumTestHarness with Logging {
     assertThrows(classOf[ConfigException], () => alterConfigWithZk(configs, 
None, encoderConfigs))
 
     // Dynamic config updates using ZK should fail if broker is running.
-    registerBrokerInZk(brokerId.toInt)
+    registerBrokerInZk(zkClient, brokerId.toInt)
     assertThrows(classOf[IllegalArgumentException], () => 
alterConfigWithZk(Map("message.max.size" -> "210000"), Some(brokerId)))
     assertThrows(classOf[IllegalArgumentException], () => 
alterConfigWithZk(Map("message.max.size" -> "220000"), None))
 
     // Dynamic config updates using ZK should for a different broker that is 
not running should succeed
     alterAndVerifyConfig(Map("message.max.size" -> "230000"), Some("2"))
   }
 
-  private def registerBrokerInZk(id: Int): Unit = {
+  private def registerBrokerInZk(zkClient: kafka.zk.KafkaZkClient, id: Int): 
Unit = {
     zkClient.createTopLevelPaths()
     val securityProtocol = SecurityProtocol.PLAINTEXT
     val endpoint = new EndPoint("localhost", 9092, 
ListenerName.forSecurityProtocol(securityProtocol), securityProtocol)
     val brokerInfo = BrokerInfo(Broker(id, Seq(endpoint), rack = None), 
MetadataVersion.latestTesting, jmxPort = 9192)
     zkClient.registerBroker(brokerInfo)
   }
+
+  @ClusterTest
+  def testUpdateInvalidBrokersConfig(): Unit = {
+    checkInvalidBrokerConfig(None)
+    
checkInvalidBrokerConfig(Some(cluster.anyBrokerSocketServer().config.brokerId.toString))
+  }
+
+  private def checkInvalidBrokerConfig(entityNameOrDefault: Option[String]): 
Unit = {
+    for (incremental <- Array(true, false)) {
+      val entityNameParams = entityNameOrDefault.map(name => 
Array("--entity-name", name)).getOrElse(Array("--entity-default"))
+      ConfigCommand.alterConfig(cluster.createAdminClient(), new 
ConfigCommandOptions(
+        Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+          "--alter",
+          "--add-config", "invalid=2",
+          "--entity-type", "brokers")
+          ++ entityNameParams
+      ), incremental)
+
+      val describeResult = TestUtils.grabConsoleOutput(
+        ConfigCommand.describeConfig(cluster.createAdminClient(), new 
ConfigCommandOptions(
+          Array("--bootstrap-server", s"${cluster.bootstrapServers()}",

Review Comment:
   Same need to be updated for all test cases



##########
core/src/main/scala/kafka/admin/ConfigCommand.scala:
##########
@@ -368,14 +368,29 @@ object ConfigCommand extends Logging {
         if (invalidConfigs.nonEmpty)
           throw new InvalidConfigurationException(s"Invalid config(s): 
${invalidConfigs.mkString(",")}")
 
-        val configResource = new ConfigResource(ConfigResource.Type.TOPIC, 
entityNameHead)
+        val configResourceType = entityTypeHead match {
+          case ConfigType.TOPIC => ConfigResource.Type.TOPIC
+          case ConfigType.CLIENT_METRICS =>ConfigResource.Type.CLIENT_METRICS
+          case ConfigType.BROKER => ConfigResource.Type.BROKER
+        }
+        val configResource = new ConfigResource(configResourceType, 
entityNameHead)
         val alterOptions = new 
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
         val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, 
AlterConfigOp.OpType.SET))
           ++ configsToBeDeleted.map { k => new AlterConfigOp(new 
ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
         ).asJavaCollection
-        adminClient.incrementalAlterConfigs(Map(configResource -> 
alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
 
-      case ConfigType.BROKER =>
+        var retryUsingDeprecatedAlterConfigs = false
+        adminClient.incrementalAlterConfigs(Map(configResource -> 
alterEntries).asJava, alterOptions).all().whenComplete((_, e) => {
+          if (e.isInstanceOf[UnsupportedVersionException] && entityTypeHead == 
ConfigType.BROKER) {

Review Comment:
   Usually in scala `isInstanceOf` is kinda of anti-pattern and mostly used in 
tests but not in prod code. Using pattern matching may be better option here; 
something like the following
    
   ```
           .whenComplete {
               case (_ , _: UnsupportedVersionException) if entityTypeHead == 
ConfigType.BROKER =>
                 retryUsingDeprecatedAlterConfigs = true
               case _ => // nothing to do
             }
      ```
          



##########
core/src/test/scala/integration/kafka/admin/ConfigCommandIntegrationTest.scala:
##########
@@ -162,19 +174,198 @@ class ConfigCommandIntegrationTest extends 
QuorumTestHarness with Logging {
     assertThrows(classOf[ConfigException], () => alterConfigWithZk(configs, 
None, encoderConfigs))
 
     // Dynamic config updates using ZK should fail if broker is running.
-    registerBrokerInZk(brokerId.toInt)
+    registerBrokerInZk(zkClient, brokerId.toInt)
     assertThrows(classOf[IllegalArgumentException], () => 
alterConfigWithZk(Map("message.max.size" -> "210000"), Some(brokerId)))
     assertThrows(classOf[IllegalArgumentException], () => 
alterConfigWithZk(Map("message.max.size" -> "220000"), None))
 
     // Dynamic config updates using ZK should for a different broker that is 
not running should succeed
     alterAndVerifyConfig(Map("message.max.size" -> "230000"), Some("2"))
   }
 
-  private def registerBrokerInZk(id: Int): Unit = {
+  private def registerBrokerInZk(zkClient: kafka.zk.KafkaZkClient, id: Int): 
Unit = {
     zkClient.createTopLevelPaths()
     val securityProtocol = SecurityProtocol.PLAINTEXT
     val endpoint = new EndPoint("localhost", 9092, 
ListenerName.forSecurityProtocol(securityProtocol), securityProtocol)
     val brokerInfo = BrokerInfo(Broker(id, Seq(endpoint), rack = None), 
MetadataVersion.latestTesting, jmxPort = 9192)
     zkClient.registerBroker(brokerInfo)
   }
+
+  @ClusterTest
+  def testUpdateInvalidBrokersConfig(): Unit = {
+    checkInvalidBrokerConfig(None)
+    
checkInvalidBrokerConfig(Some(cluster.anyBrokerSocketServer().config.brokerId.toString))
+  }
+
+  private def checkInvalidBrokerConfig(entityNameOrDefault: Option[String]): 
Unit = {
+    for (incremental <- Array(true, false)) {
+      val entityNameParams = entityNameOrDefault.map(name => 
Array("--entity-name", name)).getOrElse(Array("--entity-default"))
+      ConfigCommand.alterConfig(cluster.createAdminClient(), new 
ConfigCommandOptions(
+        Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+          "--alter",
+          "--add-config", "invalid=2",
+          "--entity-type", "brokers")
+          ++ entityNameParams
+      ), incremental)
+
+      val describeResult = TestUtils.grabConsoleOutput(
+        ConfigCommand.describeConfig(cluster.createAdminClient(), new 
ConfigCommandOptions(
+          Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+            "--describe",
+            "--entity-type", "brokers")
+            ++ entityNameParams
+        )))
+      // We will treat unknown config as sensitive
+      assertTrue(describeResult.contains("sensitive=true"))
+      // Sensitive config will not return
+      assertTrue(describeResult.contains("invalid=null"))
+    }
+  }
+
+  @ClusterTest
+  def testUpdateInvalidTopicConfig(): Unit = {
+    TestUtils.createTopicWithAdminRaw(
+      admin = cluster.createAdminClient(),
+      topic = "test-config-topic",
+    )
+    assertInstanceOf(
+      classOf[InvalidConfigurationException],
+      assertThrows(
+        classOf[ExecutionException],
+        () => ConfigCommand.alterConfig(cluster.createAdminClient(), new 
ConfigCommandOptions(
+          Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+            "--alter",
+            "--add-config", "invalid=2",
+            "--entity-type", "topics",
+            "--entity-name", "test-config-topic")
+        ), true)).getCause
+    )
+  }
+
+  @ClusterTest
+  def testUpdateAndDeleteBrokersConfig(): Unit = {
+    checkBrokerConfig(None)
+    
checkBrokerConfig(Some(cluster.anyBrokerSocketServer().config.brokerId.toString))
+  }
+
+  private def checkBrokerConfig(entityNameOrDefault: Option[String]): Unit = {
+    val entityNameParams = entityNameOrDefault.map(name => 
Array("--entity-name", name)).getOrElse(Array("--entity-default"))
+    ConfigCommand.alterConfig(cluster.createAdminClient(), new 
ConfigCommandOptions(
+      Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+        "--alter",
+        "--add-config", "log.cleaner.threads=2",
+        "--entity-type", "brokers")
+        ++ entityNameParams
+    ), true)
+    TestUtils.waitUntilTrue(
+      () => cluster.brokerSocketServers().asScala.forall(broker => 
broker.config.getInt("log.cleaner.threads") == 2),
+      "Timeout waiting for topic config propagating to broker")
+
+    val describeResult = TestUtils.grabConsoleOutput(
+      ConfigCommand.describeConfig(cluster.createAdminClient(), new 
ConfigCommandOptions(
+        Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+          "--describe",
+          "--entity-type", "brokers")
+          ++ entityNameParams
+      )))
+    assertTrue(describeResult.contains("log.cleaner.threads=2"))
+    assertTrue(describeResult.contains("sensitive=false"))
+
+    ConfigCommand.alterConfig(cluster.createAdminClient(), new 
ConfigCommandOptions(
+      Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+        "--alter",
+        "--delete-config", "log.cleaner.threads",
+        "--entity-type", "brokers")
+        ++ entityNameParams
+    ), true)
+    TestUtils.waitUntilTrue(
+      () => 
cluster.brokers().collect(Collectors.toList[KafkaBroker]).asScala.forall(broker 
=> broker.config.getInt("log.cleaner.threads") != 2),
+      "Timeout waiting for topic config propagating to broker")
+
+    assertFalse(TestUtils.grabConsoleOutput(
+      ConfigCommand.describeConfig(cluster.createAdminClient(), new 
ConfigCommandOptions(
+        Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+          "--describe",
+          "--entity-type", "brokers")
+          ++ entityNameParams
+      ))).contains("log.cleaner.threads"))
+  }
+
+  @ClusterTest
+  def testUpdateConfigAndDeleteTopicConfig(): Unit = {
+    TestUtils.createTopicWithAdminRaw(
+      admin = cluster.createAdminClient(),
+      topic = "test-config-topic",
+    )
+    ConfigCommand.alterConfig(cluster.createAdminClient(), new 
ConfigCommandOptions(
+      Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+        "--alter",
+        "--add-config", "segment.bytes=10240000",
+        "--entity-type", "topics",
+        "--entity-name", "test-config-topic")
+    ), true)
+    TestUtils.waitUntilTrue(
+      () => 
cluster.brokers().collect(Collectors.toList[KafkaBroker]).asScala.forall(broker 
=> 
broker.logManager.logsByTopic("test-config-topic").head.config.getInt("segment.bytes")
 == 10240000),
+      "Timeout waiting for topic config propagating to broker")
+
+    val describeResult = TestUtils.grabConsoleOutput(
+      ConfigCommand.describeConfig(cluster.createAdminClient(), new 
ConfigCommandOptions(
+        Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+          "--describe",
+          "--entity-type", "topics",
+          "--entity-name", "test-config-topic")
+      )))
+    assertTrue(describeResult.contains("segment.bytes=10240000"))
+    assertTrue(describeResult.contains("sensitive=false"))
+
+    ConfigCommand.alterConfig(cluster.createAdminClient(), new 
ConfigCommandOptions(
+      Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+        "--alter",
+        "--delete-config", "segment.bytes",
+        "--entity-type", "topics",
+        "--entity-name", "test-config-topic")
+    ), true)
+    TestUtils.waitUntilTrue(
+      () => 
cluster.brokers().collect(Collectors.toList[KafkaBroker]).asScala.forall(broker 
=> 
broker.logManager.logsByTopic("test-config-topic").head.config.getInt("segment.bytes")
 != 10240000),
+      "Timeout waiting for topic config propagating to broker")
+
+    assertFalse(TestUtils.grabConsoleOutput(
+      ConfigCommand.describeConfig(cluster.createAdminClient(), new 
ConfigCommandOptions(
+        Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+          "--describe",
+          "--entity-type", "topics",
+          "--entity-name", "test-config-topic")
+      ))).contains("segment.bytes"))
+  }
+
+  @ClusterTest
+  def testUpdateBrokerConfigNotAffectedByInvalidConfig(): Unit = {
+    // Test case from KAFKA-13788
+    ConfigCommand.alterConfig(cluster.createAdminClient(), new 
ConfigCommandOptions(
+      Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+        "--alter",
+        "--add-config", "log.cleaner.threadzz=2",
+        "--entity-type", "brokers",
+        "--entity-default")
+    ), true)
+
+    ConfigCommand.alterConfig(cluster.createAdminClient(), new 
ConfigCommandOptions(
+      Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+        "--alter",
+        "--add-config", "log.cleaner.threads=2",
+        "--entity-type", "brokers",
+        "--entity-default")
+    ), true)
+  }
+
+  // TODO this test doesn't make sense because we can't produce 
`UnsupportedVersionException` by setting inter.broker.protocol.version
+  @ClusterTest(clusterType=Type.ZK, metadataVersion = 
MetadataVersion.IBP_3_2_IV0)
+  def testFallbackToDeprecatedAlterConfigs(): Unit = {
+    ConfigCommand.alterConfig(cluster.createAdminClient(), new 
ConfigCommandOptions(
+      Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
+        "--alter",
+        "--add-config", "log.cleaner.threads=2",
+        "--entity-type", "brokers",
+        "--entity-default")
+    ), true)
+  }

Review Comment:
   I can't see any tests for `useIncrementalAlterConfigs` set to false. Can we 
test this path as well if possible? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to