Re: [PR] KAFKA-15684: Support to describe all client metrics resources (KIP-714) [kafka]
junrao merged PR #14933: URL: https://github.com/apache/kafka/pull/14933 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15684: Support to describe all client metrics resources (KIP-714) [kafka]
apoorvmittal10 commented on PR #14933: URL: https://github.com/apache/kafka/pull/14933#issuecomment-1843520738 > @apoorvmittal10 : Thanks for the updated PR. Just a minor comment. @junrao Thanks for again reviewing the PR. I have added thoughts for the comment and verified same locally as well. I have also verified the build and tests which failed, are not related to PR changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15684: Support to describe all client metrics resources (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14933: URL: https://github.com/apache/kafka/pull/14933#discussion_r1417826343 ## core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala: ## @@ -1690,13 +1690,33 @@ class ConfigCommandTest extends Logging { } @Test - def shouldNotDescribeClientMetricsConfigWithoutEntityName(): Unit = { + def shouldDescribeClientMetricsConfigWithoutEntityName(): Unit = { val describeOpts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092", "--entity-type", "client-metrics", "--describe")) -val exception = assertThrows(classOf[IllegalArgumentException], () => describeOpts.checkArgs()) -assertEquals("an entity name must be specified with --describe of client-metrics", exception.getMessage) +val resourceCustom = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, "1") +val configEntry = new ConfigEntry("metrics", "*") +val future = new KafkaFutureImpl[util.Map[ConfigResource, Config]] +val describeResult: DescribeConfigsResult = mock(classOf[DescribeConfigsResult]) +when(describeResult.all()).thenReturn(future) + +val node = new Node(1, "localhost", 9092) +val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) { + override def describeConfigs(resources: util.Collection[ConfigResource], options: DescribeConfigsOptions): DescribeConfigsResult = { +assertTrue(options.includeSynonyms()) +assertEquals(1, resources.size) +val resource = resources.iterator.next +assertEquals(ConfigResource.Type.CLIENT_METRICS, resource.`type`) +assertTrue(resourceCustom.name == resource.name) +future.complete(Map(resourceCustom -> new Config(util.Collections.singletonList(configEntry))).asJava) +describeResult + } +} + mockAdminClient.incrementalAlterConfigs(util.Collections.singletonMap(resourceCustom, + util.Collections.singletonList(new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET))), new AlterConfigsOptions()) +ConfigCommand.describeConfig(mockAdminClient, describeOpts) +verify(describeResult).all() Review Comment: @junrao The verification confirms that `.all` was exactly called/consumed once for the returned result in `ConfigCommand.scala`, by the code mentioned above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15684: Support to describe all client metrics resources (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14933: URL: https://github.com/apache/kafka/pull/14933#discussion_r1417807074 ## core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala: ## @@ -1690,13 +1690,33 @@ class ConfigCommandTest extends Logging { } @Test - def shouldNotDescribeClientMetricsConfigWithoutEntityName(): Unit = { + def shouldDescribeClientMetricsConfigWithoutEntityName(): Unit = { val describeOpts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092", "--entity-type", "client-metrics", "--describe")) -val exception = assertThrows(classOf[IllegalArgumentException], () => describeOpts.checkArgs()) -assertEquals("an entity name must be specified with --describe of client-metrics", exception.getMessage) +val resourceCustom = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, "1") +val configEntry = new ConfigEntry("metrics", "*") +val future = new KafkaFutureImpl[util.Map[ConfigResource, Config]] +val describeResult: DescribeConfigsResult = mock(classOf[DescribeConfigsResult]) +when(describeResult.all()).thenReturn(future) + +val node = new Node(1, "localhost", 9092) +val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) { + override def describeConfigs(resources: util.Collection[ConfigResource], options: DescribeConfigsOptions): DescribeConfigsResult = { +assertTrue(options.includeSynonyms()) +assertEquals(1, resources.size) +val resource = resources.iterator.next +assertEquals(ConfigResource.Type.CLIENT_METRICS, resource.`type`) +assertTrue(resourceCustom.name == resource.name) +future.complete(Map(resourceCustom -> new Config(util.Collections.singletonList(configEntry))).asJava) +describeResult + } +} + mockAdminClient.incrementalAlterConfigs(util.Collections.singletonMap(resourceCustom, + util.Collections.singletonList(new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET))), new AlterConfigsOptions()) +ConfigCommand.describeConfig(mockAdminClient, describeOpts) +verify(describeResult).all() Review Comment: `.all` is needed as it verifies the returned result future is actually consumed in `ConfigCommand.scala` here: https://github.com/apache/kafka/blob/46852eea1c620ff786f4c4c1ff4cbd47f912a1d9/core/src/main/scala/kafka/admin/ConfigCommand.scala#L610 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15684: Support to describe all client metrics resources (KIP-714) [kafka]
junrao commented on code in PR #14933: URL: https://github.com/apache/kafka/pull/14933#discussion_r1417759706 ## core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala: ## @@ -1690,13 +1690,33 @@ class ConfigCommandTest extends Logging { } @Test - def shouldNotDescribeClientMetricsConfigWithoutEntityName(): Unit = { + def shouldDescribeClientMetricsConfigWithoutEntityName(): Unit = { val describeOpts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092", "--entity-type", "client-metrics", "--describe")) -val exception = assertThrows(classOf[IllegalArgumentException], () => describeOpts.checkArgs()) -assertEquals("an entity name must be specified with --describe of client-metrics", exception.getMessage) +val resourceCustom = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, "1") +val configEntry = new ConfigEntry("metrics", "*") +val future = new KafkaFutureImpl[util.Map[ConfigResource, Config]] +val describeResult: DescribeConfigsResult = mock(classOf[DescribeConfigsResult]) +when(describeResult.all()).thenReturn(future) + +val node = new Node(1, "localhost", 9092) +val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) { + override def describeConfigs(resources: util.Collection[ConfigResource], options: DescribeConfigsOptions): DescribeConfigsResult = { +assertTrue(options.includeSynonyms()) +assertEquals(1, resources.size) +val resource = resources.iterator.next +assertEquals(ConfigResource.Type.CLIENT_METRICS, resource.`type`) +assertTrue(resourceCustom.name == resource.name) +future.complete(Map(resourceCustom -> new Config(util.Collections.singletonList(configEntry))).asJava) +describeResult + } +} + mockAdminClient.incrementalAlterConfigs(util.Collections.singletonMap(resourceCustom, + util.Collections.singletonList(new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET))), new AlterConfigsOptions()) +ConfigCommand.describeConfig(mockAdminClient, describeOpts) +verify(describeResult).all() Review Comment: It seems that we don't need to call `.all()` since we don't do anything with the return value? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15684: Support to describe all client metrics resources (KIP-714) [kafka]
apoorvmittal10 commented on PR #14933: URL: https://github.com/apache/kafka/pull/14933#issuecomment-1843269344 @junrao Build passed with unrelated flaky tests failure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15684: Support to describe all client metrics resources (KIP-714) [kafka]
apoorvmittal10 commented on PR #14933: URL: https://github.com/apache/kafka/pull/14933#issuecomment-1842077768 @junrao Thanks for reviewing the PR, I have addressed the comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15684: Support to describe all client metrics resources (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14933: URL: https://github.com/apache/kafka/pull/14933#discussion_r1416635870 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -6922,4 +6923,58 @@ class KafkaApisTest { val expectedResponse = new PushTelemetryResponseData().setErrorCode(Errors.INVALID_REQUEST.code) assertEquals(expectedResponse, response.data) } + + @Test + def testListClientMetricsResourcesNotAllowedForZkClusters(): Unit = { +val request = buildRequest(new ListClientMetricsResourcesRequest.Builder(new ListClientMetricsResourcesRequestData()).build()) +createKafkaApis(enableForwarding = true).handle(request, RequestLocal.NoCaching) + +val response = verifyNoThrottling[ListClientMetricsResourcesResponse](request) +assertEquals(Errors.UNKNOWN_SERVER_ERROR, Errors.forCode(response.data.errorCode)) Review Comment: This test covers when KafkaApis is configured in ZK mode and gets rejected by common `handle` method which throws `new IllegalStateException` which translates to `UNKNOWN_SERVER_ERROR`. Then it also makes sense that no handling is needed in `handleListClientMetricsResources`, as method should always have `clientMetricsManager` if call reaches to that method. But I completed the request logically there if `clientMetricsManager` is None to avoid any error case. I have added a comment in the code as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15684: Support to describe all client metrics resources (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14933: URL: https://github.com/apache/kafka/pull/14933#discussion_r1416644082 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -3810,16 +3811,27 @@ class KafkaApis(val requestChannel: RequestChannel, } } - // Just a placeholder for now. def handleListClientMetricsResources(request: RequestChannel.Request): Unit = { val listClientMetricsResourcesRequest = request.body[ListClientMetricsResourcesRequest] if (!authHelper.authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME)) { requestHelper.sendMaybeThrottle(request, listClientMetricsResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception)) } else { - // Just return an empty list in the placeholder - val data = new ListClientMetricsResourcesResponseData() - requestHelper.sendMaybeThrottle(request, new ListClientMetricsResourcesResponse(data)) + clientMetricsManager match { +case Some(metricsManager) => + try { +val data = new ListClientMetricsResourcesResponseData().setClientMetricsResources( + metricsManager.listClientMetricsResources.asScala.map( +name => new ClientMetricsResource().setName(name)).toList.asJava) +requestHelper.sendMaybeThrottle(request, new ListClientMetricsResourcesResponse(data)) + } catch { +case _: Exception => Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15684: Support to describe all client metrics resources (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14933: URL: https://github.com/apache/kafka/pull/14933#discussion_r1416642552 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -3810,16 +3811,27 @@ class KafkaApis(val requestChannel: RequestChannel, } } - // Just a placeholder for now. def handleListClientMetricsResources(request: RequestChannel.Request): Unit = { val listClientMetricsResourcesRequest = request.body[ListClientMetricsResourcesRequest] if (!authHelper.authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME)) { requestHelper.sendMaybeThrottle(request, listClientMetricsResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception)) } else { - // Just return an empty list in the placeholder - val data = new ListClientMetricsResourcesResponseData() - requestHelper.sendMaybeThrottle(request, new ListClientMetricsResourcesResponse(data)) + clientMetricsManager match { +case Some(metricsManager) => + try { +val data = new ListClientMetricsResourcesResponseData().setClientMetricsResources( + metricsManager.listClientMetricsResources.asScala.map( +name => new ClientMetricsResource().setName(name)).toList.asJava) +requestHelper.sendMaybeThrottle(request, new ListClientMetricsResourcesResponse(data)) + } catch { +case _: Exception => Review Comment: Yeah, this is not needed in this API. I have removed it and validated in test that API returns `UNKNOWN_SERVER_ERROR` now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15684: Support to describe all client metrics resources (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14933: URL: https://github.com/apache/kafka/pull/14933#discussion_r1416635870 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -6922,4 +6923,58 @@ class KafkaApisTest { val expectedResponse = new PushTelemetryResponseData().setErrorCode(Errors.INVALID_REQUEST.code) assertEquals(expectedResponse, response.data) } + + @Test + def testListClientMetricsResourcesNotAllowedForZkClusters(): Unit = { +val request = buildRequest(new ListClientMetricsResourcesRequest.Builder(new ListClientMetricsResourcesRequestData()).build()) +createKafkaApis(enableForwarding = true).handle(request, RequestLocal.NoCaching) + +val response = verifyNoThrottling[ListClientMetricsResourcesResponse](request) +assertEquals(Errors.UNKNOWN_SERVER_ERROR, Errors.forCode(response.data.errorCode)) Review Comment: This test covers when KafkaApis is configured in ZK mode and gets rejected by common `handle` method which throws `new IllegalStateException` which translates to `UNKNOWN_SERVER_ERROR`. Then it also makes sense that no handling is needed in `handleListClientMetricsResources`, as method should always have `clientMetricsManager` if call reaches to that method. But I completed the request logically there if `clientMetricsManager` is None to avoid any error case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15684: Support to describe all client metrics resources (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14933: URL: https://github.com/apache/kafka/pull/14933#discussion_r1416631582 ## core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala: ## @@ -1690,13 +1690,33 @@ class ConfigCommandTest extends Logging { } @Test - def shouldNotDescribeClientMetricsConfigWithoutEntityName(): Unit = { + def shouldDescribeClientMetricsConfigWithoutEntityName(): Unit = { val describeOpts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092", "--entity-type", "client-metrics", "--describe")) -val exception = assertThrows(classOf[IllegalArgumentException], () => describeOpts.checkArgs()) -assertEquals("an entity name must be specified with --describe of client-metrics", exception.getMessage) +val resourceCustom = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, "1") +val configEntry = new ConfigEntry("metrics", "*") +val future = new KafkaFutureImpl[util.Map[ConfigResource, Config]] +val describeResult: DescribeConfigsResult = mock(classOf[DescribeConfigsResult]) +when(describeResult.all()).thenReturn(future) + +val node = new Node(1, "localhost", 9092) +val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) { + override def describeConfigs(resources: util.Collection[ConfigResource], options: DescribeConfigsOptions): DescribeConfigsResult = { Review Comment: When the entity-name is not provided then first all names are fetched by calling `listClientMetricsResources` and then later `describeConfigs` gets called on every entity to get the details. I have implemented `listClientMetricsResources` in `MockAdminClient` and calls `mockAdminClient.incrementalAlterConfigs` to set the client metrics resource in `MockAdminClient`. The test calls `listClientMetricsResources` from ConfigCommand.scala and result of which is passed in `describeConfigs` for validation. Later I also verify if `describeConfigs` was called. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15684: Support to describe all client metrics resources (KIP-714) [kafka]
junrao commented on code in PR #14933: URL: https://github.com/apache/kafka/pull/14933#discussion_r1416359770 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -3810,16 +3811,27 @@ class KafkaApis(val requestChannel: RequestChannel, } } - // Just a placeholder for now. def handleListClientMetricsResources(request: RequestChannel.Request): Unit = { val listClientMetricsResourcesRequest = request.body[ListClientMetricsResourcesRequest] if (!authHelper.authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME)) { requestHelper.sendMaybeThrottle(request, listClientMetricsResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception)) } else { - // Just return an empty list in the placeholder - val data = new ListClientMetricsResourcesResponseData() - requestHelper.sendMaybeThrottle(request, new ListClientMetricsResourcesResponse(data)) + clientMetricsManager match { +case Some(metricsManager) => + try { +val data = new ListClientMetricsResourcesResponseData().setClientMetricsResources( + metricsManager.listClientMetricsResources.asScala.map( +name => new ClientMetricsResource().setName(name)).toList.asJava) +requestHelper.sendMaybeThrottle(request, new ListClientMetricsResourcesResponse(data)) + } catch { +case _: Exception => Review Comment: Do we need this? `KafkaApis` has a generic way to handle unexpected errors through `handleError`. ## core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala: ## @@ -1690,13 +1690,33 @@ class ConfigCommandTest extends Logging { } @Test - def shouldNotDescribeClientMetricsConfigWithoutEntityName(): Unit = { + def shouldDescribeClientMetricsConfigWithoutEntityName(): Unit = { val describeOpts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092", "--entity-type", "client-metrics", "--describe")) -val exception = assertThrows(classOf[IllegalArgumentException], () => describeOpts.checkArgs()) -assertEquals("an entity name must be specified with --describe of client-metrics", exception.getMessage) +val resourceCustom = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, "1") +val configEntry = new ConfigEntry("metrics", "*") +val future = new KafkaFutureImpl[util.Map[ConfigResource, Config]] +val describeResult: DescribeConfigsResult = mock(classOf[DescribeConfigsResult]) +when(describeResult.all()).thenReturn(future) + +val node = new Node(1, "localhost", 9092) +val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) { + override def describeConfigs(resources: util.Collection[ConfigResource], options: DescribeConfigsOptions): DescribeConfigsResult = { Review Comment: Hmm, this should call `listClientMetricsResources`, not `describeConfigs`, right? ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -6922,4 +6923,58 @@ class KafkaApisTest { val expectedResponse = new PushTelemetryResponseData().setErrorCode(Errors.INVALID_REQUEST.code) assertEquals(expectedResponse, response.data) } + + @Test + def testListClientMetricsResourcesNotAllowedForZkClusters(): Unit = { +val request = buildRequest(new ListClientMetricsResourcesRequest.Builder(new ListClientMetricsResourcesRequestData()).build()) +createKafkaApis(enableForwarding = true).handle(request, RequestLocal.NoCaching) + +val response = verifyNoThrottling[ListClientMetricsResourcesResponse](request) +assertEquals(Errors.UNKNOWN_SERVER_ERROR, Errors.forCode(response.data.errorCode)) Review Comment: Hmm, the code seems to set the error to `UNSUPPORTED_VERSION`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15684: Support to describe all client metrics resources (KIP-714) [kafka]
apoorvmittal10 commented on PR #14933: URL: https://github.com/apache/kafka/pull/14933#issuecomment-1841542850 @junrao @AndrewJSchofield Please if I can get the review on the Minor PR for the improvement of KIP-714 by using functionality exposed by KIP-1000. Also added support for listing resources in KIP-1000. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org