junrao commented on code in PR #14632:
URL: https://github.com/apache/kafka/pull/14632#discussion_r1385792007


##########
core/src/main/scala/kafka/server/DynamicConfig.scala:
##########
@@ -111,6 +112,16 @@ object DynamicConfig {
     }
   }
 
+  object ClientMetrics {
+    private val clientConfigs = kafka.metrics.ClientMetricsConfigs.configDef()
+
+    def configKeys: util.Map[String, ConfigDef.ConfigKey] = 
clientConfigs.configKeys
+
+    def names: util.Set[String] = clientConfigs.names
+
+    def validate(props: Properties) = DynamicConfig.validate(clientConfigs, 
props, customPropsAllowed = false)

Review Comment:
   This is for ZK-based controller. Since ClientMetrics is not supported there, 
do we need this? Ditto for `configKeys`.



##########
core/src/main/scala/kafka/admin/ConfigCommand.scala:
##########
@@ -536,6 +552,8 @@ object ConfigCommand extends Logging {
           adminClient.listTopics(new 
ListTopicsOptions().listInternal(true)).names().get().asScala.toSeq
         case ConfigType.Broker | BrokerLoggerConfigType =>
           adminClient.describeCluster(new 
DescribeClusterOptions()).nodes().get().asScala.map(_.idString).toSeq :+ 
BrokerDefaultEntityName
+        case ConfigType.ClientMetrics =>
+          throw new InvalidRequestException("Client metrics entity-name is 
required")

Review Comment:
   I guess this will be improved through 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1000%3A+List+Client+Metrics+Configuration+Resources
 ?



##########
core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala:
##########
@@ -1628,6 +1628,122 @@ class ConfigCommandTest extends Logging {
         Seq("<default>/clients/client-3", sanitizedPrincipal + 
"/clients/client-2"))
   }
 
+  @Test
+  def shouldAlterClientMetricsConfig(): Unit = {
+    val node = new Node(1, "localhost", 9092)
+    verifyAlterClientMetricsConfig(node, "1", List("--entity-name", "1"))
+  }
+
+  def verifyAlterClientMetricsConfig(node: Node, resourceName: String, 
resourceOpts: List[String]): Unit = {

Review Comment:
   Could this be private?



##########
core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala:
##########
@@ -1628,6 +1628,122 @@ class ConfigCommandTest extends Logging {
         Seq("<default>/clients/client-3", sanitizedPrincipal + 
"/clients/client-2"))
   }
 
+  @Test
+  def shouldAlterClientMetricsConfig(): Unit = {
+    val node = new Node(1, "localhost", 9092)
+    verifyAlterClientMetricsConfig(node, "1", List("--entity-name", "1"))
+  }
+
+  def verifyAlterClientMetricsConfig(node: Node, resourceName: String, 
resourceOpts: List[String]): Unit = {
+    val optsList = List("--bootstrap-server", "localhost:9092",
+      "--entity-type", "client-metrics",
+      "--alter",
+      "--delete-config", "interval.ms",
+      "--add-config", "metrics=org.apache.kafka.consumer.," +
+        
"match=[client_software_name=kafka.python,client_software_version=1\\.2\\..*]") 
++ resourceOpts
+    val alterOpts = new ConfigCommandOptions(optsList.toArray)
+
+    val resource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, 
resourceName)
+    val configEntries = util.Collections.singletonList(new 
ConfigEntry("interval.ms", "1000",
+      ConfigEntry.ConfigSource.DYNAMIC_CLIENT_METRICS_CONFIG, false, false, 
Collections.emptyList,
+      ConfigEntry.ConfigType.UNKNOWN, null))
+    val future = new KafkaFutureImpl[util.Map[ConfigResource, Config]]
+    future.complete(util.Collections.singletonMap(resource, new 
Config(configEntries)))
+    val describeResult: DescribeConfigsResult = 
mock(classOf[DescribeConfigsResult])
+    when(describeResult.all()).thenReturn(future)
+
+    val alterFuture = new KafkaFutureImpl[Void]
+    alterFuture.complete(null)
+    val alterResult: AlterConfigsResult = mock(classOf[AlterConfigsResult])
+    when(alterResult.all()).thenReturn(alterFuture)
+
+    val mockAdminClient = new 
MockAdminClient(util.Collections.singletonList(node), node) {
+      override def describeConfigs(resources: util.Collection[ConfigResource], 
options: DescribeConfigsOptions): DescribeConfigsResult = {
+        assertFalse(options.includeSynonyms(), "Config synonyms requested 
unnecessarily")
+        assertEquals(1, resources.size)
+        val resource = resources.iterator.next
+        assertEquals(ConfigResource.Type.CLIENT_METRICS, resource.`type`)
+        assertEquals(resourceName, resource.name)
+        describeResult
+      }
+
+      override def incrementalAlterConfigs(configs: util.Map[ConfigResource, 
util.Collection[AlterConfigOp]], options: AlterConfigsOptions): 
AlterConfigsResult = {
+        assertEquals(1, configs.size)
+        val entry = configs.entrySet.iterator.next
+        val resource = entry.getKey
+        val alterConfigOps = entry.getValue
+        assertEquals(ConfigResource.Type.CLIENT_METRICS, resource.`type`)
+        assertEquals(3, alterConfigOps.size)
+
+        val expectedConfigOps = List(
+          new AlterConfigOp(new ConfigEntry("match", 
"client_software_name=kafka.python,client_software_version=1\\.2\\..*"), 
AlterConfigOp.OpType.SET),
+          new AlterConfigOp(new ConfigEntry("metrics", 
"org.apache.kafka.consumer."), AlterConfigOp.OpType.SET),
+          new AlterConfigOp(new ConfigEntry("interval.ms", ""), 
AlterConfigOp.OpType.DELETE)
+        )
+        assertEquals(expectedConfigOps, alterConfigOps.asScala.toList)
+        alterResult
+      }
+    }
+    ConfigCommand.alterConfig(mockAdminClient, alterOpts)
+    verify(describeResult).all()

Review Comment:
   Do we need to verify `alterResult` too? Also, why do we need to call `all()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to