junrao commented on code in PR #14933:
URL: https://github.com/apache/kafka/pull/14933#discussion_r1416359770


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3810,16 +3811,27 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  // Just a placeholder for now.
   def handleListClientMetricsResources(request: RequestChannel.Request): Unit 
= {
     val listClientMetricsResourcesRequest = 
request.body[ListClientMetricsResourcesRequest]
 
     if (!authHelper.authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, 
CLUSTER_NAME)) {
       requestHelper.sendMaybeThrottle(request, 
listClientMetricsResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
     } else {
-      // Just return an empty list in the placeholder
-      val data = new ListClientMetricsResourcesResponseData()
-      requestHelper.sendMaybeThrottle(request, new 
ListClientMetricsResourcesResponse(data))
+      clientMetricsManager match {
+        case Some(metricsManager) =>
+          try {
+            val data = new 
ListClientMetricsResourcesResponseData().setClientMetricsResources(
+              metricsManager.listClientMetricsResources.asScala.map(
+                name => new 
ClientMetricsResource().setName(name)).toList.asJava)
+            requestHelper.sendMaybeThrottle(request, new 
ListClientMetricsResourcesResponse(data))
+          } catch {
+            case _: Exception =>

Review Comment:
   Do we need this? `KafkaApis` has a generic way to handle unexpected errors 
through `handleError`.



##########
core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala:
##########
@@ -1690,13 +1690,33 @@ class ConfigCommandTest extends Logging {
   }
 
   @Test
-  def shouldNotDescribeClientMetricsConfigWithoutEntityName(): Unit = {
+  def shouldDescribeClientMetricsConfigWithoutEntityName(): Unit = {
     val describeOpts = new ConfigCommandOptions(Array("--bootstrap-server", 
"localhost:9092",
       "--entity-type", "client-metrics",
       "--describe"))
 
-    val exception = assertThrows(classOf[IllegalArgumentException], () => 
describeOpts.checkArgs())
-    assertEquals("an entity name must be specified with --describe of 
client-metrics", exception.getMessage)
+    val resourceCustom = new 
ConfigResource(ConfigResource.Type.CLIENT_METRICS, "1")
+    val configEntry = new ConfigEntry("metrics", "*")
+    val future = new KafkaFutureImpl[util.Map[ConfigResource, Config]]
+    val describeResult: DescribeConfigsResult = 
mock(classOf[DescribeConfigsResult])
+    when(describeResult.all()).thenReturn(future)
+
+    val node = new Node(1, "localhost", 9092)
+    val mockAdminClient = new 
MockAdminClient(util.Collections.singletonList(node), node) {
+      override def describeConfigs(resources: util.Collection[ConfigResource], 
options: DescribeConfigsOptions): DescribeConfigsResult = {

Review Comment:
   Hmm, this should call `listClientMetricsResources`, not `describeConfigs`, 
right?



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -6922,4 +6923,58 @@ class KafkaApisTest {
     val expectedResponse = new 
PushTelemetryResponseData().setErrorCode(Errors.INVALID_REQUEST.code)
     assertEquals(expectedResponse, response.data)
   }
+
+  @Test
+  def testListClientMetricsResourcesNotAllowedForZkClusters(): Unit = {
+    val request = buildRequest(new 
ListClientMetricsResourcesRequest.Builder(new 
ListClientMetricsResourcesRequestData()).build())
+    createKafkaApis(enableForwarding = true).handle(request, 
RequestLocal.NoCaching)
+
+    val response = 
verifyNoThrottling[ListClientMetricsResourcesResponse](request)
+    assertEquals(Errors.UNKNOWN_SERVER_ERROR, 
Errors.forCode(response.data.errorCode))

Review Comment:
   Hmm, the code seems to set the error to `UNSUPPORTED_VERSION`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to