Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
junrao merged PR #14699: URL: https://github.com/apache/kafka/pull/14699 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on PR #14699: URL: https://github.com/apache/kafka/pull/14699#issuecomment-1832344298 @junrao The tests failures are not related to the changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on PR #14699: URL: https://github.com/apache/kafka/pull/14699#issuecomment-1832108927 Hi @junrao, I have triaged the test cases as per the latest run. `Existing`: Build / JDK 17 and Scala 2.13 / testMultiWorkerRestartOnlyConnector – org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest 2m 21s: https://issues.apache.org/jira/browse/KAFKA-15675 `Existing`: Build / JDK 17 and Scala 2.13 / testRackAwareRangeAssignor(String).quorum=zk – integration.kafka.server.FetchFromFollowerIntegrationTest 39s: https://issues.apache.org/jira/browse/KAFKA-15020 `Existing`: Build / JDK 17 and Scala 2.13 / testCoordinatorFailover(String, String).quorum=zk.groupProtocol=generic – kafka.api.SslConsumerTest 8s: https://issues.apache.org/jira/browse/KAFKA-15920 `Existing`: Build / JDK 17 and Scala 2.13 / testDescribeClusterRequestExcludingClusterAuthorizedOperations(String).quorum=kraft – kafka.server.DescribeClusterRequestTest 3s: https://issues.apache.org/jira/browse/KAFKA-15419 `Existing`: Build / JDK 17 and Scala 2.13 / testFenceMultipleBrokers() – org.apache.kafka.controller.QuorumControllerTest 51s: https://issues.apache.org/jira/browse/KAFKA-15898 `Existing`: Build / JDK 21 and Scala 2.13 / testSyncTopicConfigs() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest 1m 42s: https://issues.apache.org/jira/browse/KAFKA-15523 `Existing`: Build / JDK 21 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – org.apache.kafka.trogdor.coordinator.CoordinatorTest 2m 0s: https://issues.apache.org/jira/browse/KAFKA-15760 `Existing`: Build / JDK 11 and Scala 2.13 / testSyncTopicConfigs() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest 3m 44s: https://issues.apache.org/jira/browse/KAFKA-15523 `Triaged`: Build / JDK 11 and Scala 2.13 / testRestartReplication() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest 3m 21s: https://issues.apache.org/jira/browse/KAFKA-15933 `Triaged`: Build / JDK 11 and Scala 2.13 / testMultiConsumerStickyAssignor(String, String).quorum=kraft+kip848.groupProtocol=generic – kafka.api.PlaintextConsumerTest 2m 12s: https://issues.apache.org/jira/browse/KAFKA-15934 `Existing`: Build / JDK 8 and Scala 2.12 / testSyncTopicConfigs() – org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest 4m 25s: https://issues.apache.org/jira/browse/KAFKA-14971 `Triaged`: Build / JDK 8 and Scala 2.12 / testRestartReplication() – org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest 2m 8s: https://issues.apache.org/jira/browse/KAFKA-15935 `Existing`: Build / JDK 8 and Scala 2.12 / testNoCheckpointsIfNoRecordsAreMirrored() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest 2m 59s: https://issues.apache.org/jira/browse/KAFKA-15699 `Existing`: Build / JDK 8 and Scala 2.12 / testSyncTopicConfigs() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest 3m 37s: https://issues.apache.org/jira/browse/KAFKA-15523 `Existing`: Build / JDK 8 and Scala 2.12 / testMultiWorkerRestartOnlyConnector – org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest 2m 28s: https://issues.apache.org/jira/browse/KAFKA-15761 `Existing`: Build / JDK 8 and Scala 2.12 / testMultiWorkerRestartOnlyConnector – org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest 2m 39s: https://issues.apache.org/jira/browse/KAFKA-15761 `Triaged`: Build / JDK 8 and Scala 2.12 / testGetActiveTopics – org.apache.kafka.connect.integration.ConnectorTopicsIntegrationTest 1m 50s: https://issues.apache.org/jira/browse/KAFKA-15936 `Triaged`: Build / JDK 8 and Scala 2.12 / testTopicTrackingResetIsDisabled – org.apache.kafka.connect.integration.ConnectorTopicsIntegrationTest 2m 37s: https://issues.apache.org/jira/browse/KAFKA-15936 `Triaged`: Build / JDK 8 and Scala 2.12 / testTopicTrackingIsDisabled – org.apache.kafka.connect.integration.ConnectorTopicsIntegrationTest 3m 4s: https://issues.apache.org/jira/browse/KAFKA-15936 `Existing`: Build / JDK 8 and Scala 2.12 / testConnectorReconfiguration – org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest 1m 42s: https://issues.apache.org/jira/browse/KAFKA-14901 `Existing`: Build / JDK 8 and Scala 2.12 / testSeparateOffsetsTopic – org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest 3m 35s: https://issues.apache.org/jira/browse/KAFKA-14089 `Triaged`: Build / JDK 8 and Scala 2.12 / testTopicDeletion(String).quorum=kraft – kafka.admin.RemoteTopicCrudTest 1m 30s: https://issues.apache.org/jira/browse/KAFKA-15937 `Triaged`: Build / JDK 8 and Scala 2.12 /
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 closed pull request #14699: KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) URL: https://github.com/apache/kafka/pull/14699 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on PR #14699: URL: https://github.com/apache/kafka/pull/14699#issuecomment-1831074267 > @apoorvmittal10 : It seems that the build for JDK 17 and Scala 2.13 didn't complete. Thanks @junrao. Strange to see the failure in. `:clients:test`. I can see below unrelated failure in logs, retriggered build. ``` > Task :clients:test org.apache.kafka.common.network.SelectorTest.testConnectionsByClientMetric() failed, log available in /home/jenkins/jenkins-agent/712657a4/workspace/Kafka_kafka-pr_PR-14699/clients/build/reports/testOutput/org.apache.kafka.common.network.SelectorTest.testConnectionsByClientMetric().test.stdout Gradle Test Run :clients:test > Gradle Test Executor 13 > SelectorTest > testConnectionsByClientMetric() FAILED java.util.concurrent.TimeoutException: testConnectionsByClientMetric() timed out after 240 seconds at org.junit.jupiter.engine.extension.TimeoutExceptionFactory.create(TimeoutExceptionFactory.java:29) at org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:58) at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:214) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
junrao commented on PR #14699: URL: https://github.com/apache/kafka/pull/14699#issuecomment-1830813531 @apoorvmittal10 : It seems that the build for JDK 17 and Scala 2.13 didn't complete. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on PR #14699: URL: https://github.com/apache/kafka/pull/14699#issuecomment-1830450426 > Just merged #14632. Triggering another test run to make sure there are no new issues. Thanks @junrao. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
junrao commented on PR #14699: URL: https://github.com/apache/kafka/pull/14699#issuecomment-1830354235 Just merged https://github.com/apache/kafka/pull/14632. Triggering another test run to make sure there are no new issues. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
junrao closed pull request #14699: KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) URL: https://github.com/apache/kafka/pull/14699 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
junrao commented on PR #14699: URL: https://github.com/apache/kafka/pull/14699#issuecomment-1828329099 @apoorvmittal10 : Thanks for triaging the tests. In the mailing list, it seems that we are still leaning towards requiring green builds before merging a PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on PR #14699: URL: https://github.com/apache/kafka/pull/14699#issuecomment-1826397165 @junrao @AndrewJSchofield Thanks for the LGTM and approving the PR. Below is the test run status, none of the failing tests are related to the changes in the PR but I have tried to debug further. Among 20, 17 have already been reported as flaky test in jira, for remaining 3 I tried to locally reproduce but couldn't hence created `flaky test` for same. Can we merge the PR? New failing - 20 `Already existing jira for flaky test`: Build / JDK 11 and Scala 2.13 / testResetSinkConnectorOffsetsZombieSinkTasks – org.apache.kafka.connect.integration.OffsetsApiIntegrationTest 1m 2s: https://issues.apache.org/jira/browse/KAFKA-15524 `Already existing jira for flaky test`: Build / JDK 11 and Scala 2.13 / testRackAwareRangeAssignor(String).quorum=kraft – integration.kafka.server.FetchFromFollowerIntegrationTest 8s: https://issues.apache.org/jira/browse/KAFKA-15020 `Already existing jira for flaky test`: Build / JDK 11 and Scala 2.13 / testRackAwareRangeAssignor(String).quorum=kraft – integration.kafka.server.FetchFromFollowerIntegrationTest 6s: https://issues.apache.org/jira/browse/KAFKA-15020 `Already existing jira for flaky test`: Build / JDK 11 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – org.apache.kafka.trogdor.coordinator.CoordinatorTest 2m 0s: https://issues.apache.org/jira/browse/KAFKA-15760 `Already existing jira for flaky test`: Build / JDK 21 and Scala 2.13 / testReplicateSourceDefault() – org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest 2m 0s: https://issues.apache.org/jira/browse/KAFKA-15292 `Already existing jira for flaky test`: Build / JDK 21 and Scala 2.13 / testConsumptionWithBrokerFailures() – kafka.api.ConsumerBounceTest 31s: https://issues.apache.org/jira/browse/KAFKA-15146 `Already existing jira for flaky test`: Build / JDK 21 and Scala 2.13 / testDescribeClusterRequestIncludingClusterAuthorizedOperations(String).quorum=kraft – kafka.server.DescribeClusterRequestTest 4s: https://issues.apache.org/jira/browse/KAFKA-15759 `Already existing jira for flaky test`: Build / JDK 21 and Scala 2.13 / testSendOffsetsWithGroupId(String).quorum=zk – org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest 1m 15s: https://issues.apache.org/jira/browse/KAFKA-15772 `Created Jira, cannot reproduce locally (also exists other flaky tests from same class, have already ben reported by others)`: Build / JDK 21 and Scala 2.13 / shouldQuerySpecificStalePartitionStores() – org.apache.kafka.streams.integration.StoreQueryIntegrationTest 4s: Another test in same class flaky: https://issues.apache.org/jira/browse/KAFKA-9897, created jira: https://issues.apache.org/jira/browse/KAFKA-15896 `Already existing jira for flaky test`: Build / JDK 17 and Scala 2.13 / testReplicateSourceDefault() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest 2m 23s: https://issues.apache.org/jira/browse/KAFKA-15699 `Already existing jira for flaky test`: Build / JDK 17 and Scala 2.13 / testMultiWorkerRestartOnlyConnector – org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest 2m 33s: https://issues.apache.org/jira/browse/KAFKA-15675 `Already existing jira for flaky test`: Build / JDK 17 and Scala 2.13 / testDescribeTokenForOtherUserPasses(String).quorum=kraft – kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest 9s: https://issues.apache.org/jira/browse/KAFKA-15411 `Created flaky test jira, cannot reproduce locally`: Build / JDK 17 and Scala 2.13 / testWrongIncarnationId() – kafka.server.ControllerRegistrationManagerTest 30s: https://issues.apache.org/jira/browse/KAFKA-15897 `Created flaky test jira, cannot reproduce locally`: Build / JDK 17 and Scala 2.13 / testFenceMultipleBrokers() – org.apache.kafka.controller.QuorumControllerTest 43s: https://issues.apache.org/jira/browse/KAFKA-15898 `Already existing jira for flaky test`: Build / JDK 8 and Scala 2.12 / testSyncTopicConfigs() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest 2m 33s: https://issues.apache.org/jira/browse/KAFKA-14971 `Already existing jira for flaky test`: Build / JDK 8 and Scala 2.12 / [2] quorum=kraft, isIdempotenceEnabled=false – kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest 10s: https://issues.apache.org/jira/browse/KAFKA-15411 `Already existing jira for flaky test`: Build / JDK 8 and Scala 2.12 / testDescribeClusterRequestExcludingClusterAuthorizedOperations(String).quorum=kraft – kafka.server.DescribeClusterRequestTest 4s: https://issues.apache.org/jira/browse/KAFKA-15419 `Already existing jira for flaky test`: Build / JDK 8 and Scala 2.12 /
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1404286606 ## clients/src/main/java/org/apache/kafka/common/protocol/Errors.java: ## @@ -386,7 +388,9 @@ public enum Errors { STALE_MEMBER_EPOCH(113, "The member epoch is stale. The member must retry after receiving its updated member epoch via the ConsumerGroupHeartbeat API.", StaleMemberEpochException::new), MISMATCHED_ENDPOINT_TYPE(114, "The request was sent to an endpoint of the wrong type.", MismatchedEndpointTypeException::new), UNSUPPORTED_ENDPOINT_TYPE(115, "This endpoint type is not supported yet.", UnsupportedEndpointTypeException::new), -UNKNOWN_CONTROLLER_ID(116, "This controller ID is not known.", UnknownControllerIdException::new); +UNKNOWN_CONTROLLER_ID(116, "This controller ID is not known.", UnknownControllerIdException::new), +UNKNOWN_SUBSCRIPTION_ID(117, "Client sent a push telemetry request with an invalid or outdated subscription ID", UnknownSubscriptionIdException::new), Review Comment: Done, thanks for pointing out. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
AndrewJSchofield commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1404253943 ## clients/src/main/java/org/apache/kafka/common/protocol/Errors.java: ## @@ -386,7 +388,9 @@ public enum Errors { STALE_MEMBER_EPOCH(113, "The member epoch is stale. The member must retry after receiving its updated member epoch via the ConsumerGroupHeartbeat API.", StaleMemberEpochException::new), MISMATCHED_ENDPOINT_TYPE(114, "The request was sent to an endpoint of the wrong type.", MismatchedEndpointTypeException::new), UNSUPPORTED_ENDPOINT_TYPE(115, "This endpoint type is not supported yet.", UnsupportedEndpointTypeException::new), -UNKNOWN_CONTROLLER_ID(116, "This controller ID is not known.", UnknownControllerIdException::new); +UNKNOWN_CONTROLLER_ID(116, "This controller ID is not known.", UnknownControllerIdException::new), +UNKNOWN_SUBSCRIPTION_ID(117, "Client sent a push telemetry request with an invalid or outdated subscription ID", UnknownSubscriptionIdException::new), Review Comment: Tiny change request. Please put a full stop at the end of the error strings. `"Client sent a push telemetry request with an invalid or outdated subscription ID."`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 closed pull request #14699: KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) URL: https://github.com/apache/kafka/pull/14699 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on PR #14699: URL: https://github.com/apache/kafka/pull/14699#issuecomment-1822604515 > @apoorvmittal10 : Thanks for the updated PR. LGTM. Are the 34 test failures related? Thanks @junrao, the tests are not related and I ll trigger re-build to see if tests fail persists. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on PR #14699: URL: https://github.com/apache/kafka/pull/14699#issuecomment-1820280356 Thanks @junrao for explaining the details. I have updated the PR and removed throttleMs from ClientMetricsManager. I have added a Jira to add respective throttling changes in QuotaManager. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1400049044 ## core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala: ## @@ -103,9 +103,16 @@ class DynamicConfigPublisher( ) case CLIENT_METRICS => // Apply changes to client metrics subscription. - info(s"Updating client metrics subscription ${resource.name()} with new configuration : " + -toLoggableProps(resource, props).mkString(",")) - dynamicConfigHandlers(ConfigType.ClientMetrics).processConfigChanges(resource.name(), props) + dynamicConfigHandlers.get(ConfigType.ClientMetrics).foreach(metricsConfigHandler => +try { + info(s"Updating client metrics ${resource.name()} with new configuration : " + +toLoggableProps(resource, props).mkString(",")) Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1400048884 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -16,31 +16,421 @@ */ package kafka.server; +import kafka.metrics.ClientMetricsConfigs; +import kafka.metrics.ClientMetricsInstance; +import kafka.metrics.ClientMetricsInstanceMetadata; +import kafka.metrics.ClientMetricsReceiverPlugin; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; +import org.apache.kafka.common.cache.SynchronizedCache; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.TelemetryTooLargeException; +import org.apache.kafka.common.errors.ThrottlingQuotaExceededException; +import org.apache.kafka.common.errors.UnknownSubscriptionIdException; +import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.Crc32C; +import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; /** * Handles client telemetry metrics requests/responses, subscriptions and instance information. */ public class ClientMetricsManager implements Closeable { private static final Logger log = LoggerFactory.getLogger(ClientMetricsManager.class); -private static final ClientMetricsManager INSTANCE = new ClientMetricsManager(); +private static final List SUPPORTED_COMPRESSION_TYPES = Collections.unmodifiableList( +Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, CompressionType.GZIP.id, CompressionType.SNAPPY.id)); +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; + +private final Cache clientInstanceCache; +private final Map subscriptionMap; +private final KafkaConfig config; +private final Time time; + +// The latest subscription version is used to determine if subscription has changed and needs +// to re-evaluate the client instance subscription id as per changed subscriptions. +private final AtomicInteger subscriptionUpdateVersion; -public static ClientMetricsManager instance() { -return INSTANCE; +public ClientMetricsManager(KafkaConfig config, Time time) { +this.subscriptionMap = new ConcurrentHashMap<>(); +this.subscriptionUpdateVersion = new AtomicInteger(0); +this.clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); +this.config = config; +this.time = time; } public void updateSubscription(String subscriptionName, Properties properties) { -// TODO: Implement the update logic to manage subscriptions. +// Validate the subscription properties. +ClientMetricsConfigs.validate(subscriptionName, properties); +// IncrementalAlterConfigs API will send empty configs when all the configs are deleted +// for respective subscription. In that case, we need to remove the subscription from the map. +if (properties.isEmpty()) { +// Remove the subscription from the map if it exists, else ignore the config update. +if (subscriptionMap.containsKey(subscriptionName)) { +log.info("Removing subscription [{}] from the subscription map", subscriptionName); +subscriptionMap.remove(subscriptionName); +this.subscriptionUpdateVersion.incrementAndGet(); +} +return; +} + +updateClientSubscription(subscriptionName, new ClientMetricsConfigs(properties)); +} + +public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( +
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1400045702 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -16,31 +16,420 @@ */ package kafka.server; +import kafka.metrics.ClientMetricsConfigs; Review Comment: As I received merge conflict in the current PR hence merged the upstream changes which required to move the classes to write package. I have done this in current PR itself. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
junrao commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1399714279 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -16,31 +16,421 @@ */ package kafka.server; +import kafka.metrics.ClientMetricsConfigs; +import kafka.metrics.ClientMetricsInstance; +import kafka.metrics.ClientMetricsInstanceMetadata; +import kafka.metrics.ClientMetricsReceiverPlugin; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; +import org.apache.kafka.common.cache.SynchronizedCache; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.TelemetryTooLargeException; +import org.apache.kafka.common.errors.ThrottlingQuotaExceededException; +import org.apache.kafka.common.errors.UnknownSubscriptionIdException; +import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.Crc32C; +import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; /** * Handles client telemetry metrics requests/responses, subscriptions and instance information. */ public class ClientMetricsManager implements Closeable { private static final Logger log = LoggerFactory.getLogger(ClientMetricsManager.class); -private static final ClientMetricsManager INSTANCE = new ClientMetricsManager(); +private static final List SUPPORTED_COMPRESSION_TYPES = Collections.unmodifiableList( +Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, CompressionType.GZIP.id, CompressionType.SNAPPY.id)); +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; + +private final Cache clientInstanceCache; +private final Map subscriptionMap; +private final KafkaConfig config; +private final Time time; + +// The latest subscription version is used to determine if subscription has changed and needs +// to re-evaluate the client instance subscription id as per changed subscriptions. +private final AtomicInteger subscriptionUpdateVersion; -public static ClientMetricsManager instance() { -return INSTANCE; +public ClientMetricsManager(KafkaConfig config, Time time) { +this.subscriptionMap = new ConcurrentHashMap<>(); +this.subscriptionUpdateVersion = new AtomicInteger(0); +this.clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); +this.config = config; +this.time = time; } public void updateSubscription(String subscriptionName, Properties properties) { -// TODO: Implement the update logic to manage subscriptions. +// Validate the subscription properties. +ClientMetricsConfigs.validate(subscriptionName, properties); +// IncrementalAlterConfigs API will send empty configs when all the configs are deleted +// for respective subscription. In that case, we need to remove the subscription from the map. +if (properties.isEmpty()) { +// Remove the subscription from the map if it exists, else ignore the config update. +if (subscriptionMap.containsKey(subscriptionName)) { +log.info("Removing subscription [{}] from the subscription map", subscriptionName); +subscriptionMap.remove(subscriptionName); +this.subscriptionUpdateVersion.incrementAndGet(); +} +return; +} + +updateClientSubscription(subscriptionName, new ClientMetricsConfigs(properties)); +} + +public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( +GetTelemetrySubscriptionsRequest
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on PR #14699: URL: https://github.com/apache/kafka/pull/14699#issuecomment-1817955171 Thanks for the review @junrao. I have updated the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1398480299 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable { public static ClientMetricsManager instance() { return INSTANCE; } +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; +private final Cache clientInstanceCache; +private final Map subscriptionMap; + +// The last subscription updated time is used to determine if the next telemetry request needs +// to re-evaluate the subscription id as per changes subscriptions. +private long lastSubscriptionUpdateEpoch; + +// Visible for testing +ClientMetricsManager() { +subscriptionMap = new ConcurrentHashMap<>(); +clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); +} public void updateSubscription(String subscriptionName, Properties properties) { -// TODO: Implement the update logic to manage subscriptions. +// IncrementalAlterConfigs API will send empty configs when all the configs are deleted +// for respective subscription. In that case, we need to remove the subscription from the map. +if (properties.isEmpty()) { +// Remove the subscription from the map if it exists, else ignore the config update. +if (subscriptionMap.containsKey(subscriptionName)) { +log.info("Removing subscription [{}] from the subscription map", subscriptionName); +subscriptionMap.remove(subscriptionName); +updateLastSubscriptionUpdateEpoch(); +} +return; +} + +ClientMetricsConfigs configs = new ClientMetricsConfigs(properties); +updateClientSubscription(subscriptionName, configs); +/* + Update last subscription updated time to current time to indicate that there is a change + in the subscription. This will be used to determine if the next telemetry request needs + to re-evaluate the subscription id as per changes subscriptions. +*/ +updateLastSubscriptionUpdateEpoch(); +} + +public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( +GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +long now = System.currentTimeMillis(); +Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) +.filter(id -> !id.equals(Uuid.ZERO_UUID)) +.orElse(generateNewClientId()); + +/* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issued get telemetry + request prior to push interval, then the client should get a throttle error but if the + subscription has changed since the last request then the client should get the updated + subscription immediately. +*/ +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the get request parameters for the client instance. +validateGetRequest(request, clientInstance); +} catch (ApiException exception) { +return request.getErrorResponse(throttleMs, exception); +} finally { +clientInstance.lastGetRequestEpoch(now); +} + +clientInstance.lastKnownError(Errors.NONE); +return createGetSubscriptionResponse(clientInstanceId, clientInstance, telemetryMaxBytes, throttleMs); +} + +public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, +int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +Uuid clientInstanceId = request.data().clientInstanceId(); +if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) { +String msg = String.format("Invalid request from the client [%s], invalid client instance id", +clientInstanceId); +return request.getErrorResponse(throttleMs, new InvalidRequestException(msg)); +} + +long now = System.currentTimeMillis(); +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the push request parameters for the client instance. +validatePushRequest(request, telemetryMaxBytes, clientInstance); +} catch (ApiException exception) { +clientInstance.lastKnownError(Errors.forException(exception));
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1398480068 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -30,17 +69,376 @@ public class ClientMetricsManager implements Closeable { private static final Logger log = LoggerFactory.getLogger(ClientMetricsManager.class); private static final ClientMetricsManager INSTANCE = new ClientMetricsManager(); +private static final List SUPPORTED_COMPRESSION_TYPES = Collections.unmodifiableList( +Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, CompressionType.GZIP.id, CompressionType.SNAPPY.id)); public static ClientMetricsManager instance() { return INSTANCE; } +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; +private final Cache clientInstanceCache; +private final Map subscriptionMap; +private final Time time; + +// The latest subscription version is used to determine if subscription has changed and needs +// to re-evaluate the client instance subscription id as per changed subscriptions. +private final AtomicInteger subscriptionUpdateVersion; + +private ClientMetricsManager() { +this(Time.SYSTEM); +} + +// Visible for testing +ClientMetricsManager(Time time) { +this.subscriptionMap = new ConcurrentHashMap<>(); +this.subscriptionUpdateVersion = new AtomicInteger(0); +this.clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); +this.time = time; +} public void updateSubscription(String subscriptionName, Properties properties) { -// TODO: Implement the update logic to manage subscriptions. +// Validate the subscription properties. +ClientMetricsConfigs.validate(subscriptionName, properties); +// IncrementalAlterConfigs API will send empty configs when all the configs are deleted +// for respective subscription. In that case, we need to remove the subscription from the map. +if (properties.isEmpty()) { +// Remove the subscription from the map if it exists, else ignore the config update. +if (subscriptionMap.containsKey(subscriptionName)) { +log.info("Removing subscription [{}] from the subscription map", subscriptionName); +subscriptionMap.remove(subscriptionName); +this.subscriptionUpdateVersion.incrementAndGet(); +} +return; +} + +updateClientSubscription(subscriptionName, new ClientMetricsConfigs(properties)); +} + +public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( +GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +long now = time.milliseconds(); +Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) +.filter(id -> !id.equals(Uuid.ZERO_UUID)) +.orElse(generateNewClientId()); + +/* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issues another get + telemetry request prior to push interval, then the client should get a throttle error but if + the subscription has changed since the last request then the client should get the updated + subscription immediately. +*/ +ClientMetricsInstance clientInstance = clientInstance(clientInstanceId, requestContext); + +try { +// Validate the get request parameters for the client instance. +validateGetRequest(request, clientInstance, now); +} catch (ApiException exception) { +return request.getErrorResponse(throttleMs, exception); +} + +clientInstance.lastKnownError(Errors.NONE); +return createGetSubscriptionResponse(clientInstanceId, clientInstance, telemetryMaxBytes, throttleMs); +} + +public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, +int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +Uuid clientInstanceId = request.data().clientInstanceId(); +if (clientInstanceId == null || Uuid.RESERVED.contains(clientInstanceId)) { +String msg = String.format("Invalid request from the client [%s], invalid client instance id", +clientInstanceId); +return request.getErrorResponse(throttleMs, new InvalidRequestException(msg)); +} + +long now = time.milliseconds(); +ClientMetricsInstance clientInstance = clientInstance(clientInstanceId,
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1398479956 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -30,17 +69,376 @@ public class ClientMetricsManager implements Closeable { private static final Logger log = LoggerFactory.getLogger(ClientMetricsManager.class); private static final ClientMetricsManager INSTANCE = new ClientMetricsManager(); +private static final List SUPPORTED_COMPRESSION_TYPES = Collections.unmodifiableList( +Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, CompressionType.GZIP.id, CompressionType.SNAPPY.id)); public static ClientMetricsManager instance() { return INSTANCE; } +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; +private final Cache clientInstanceCache; +private final Map subscriptionMap; +private final Time time; + +// The latest subscription version is used to determine if subscription has changed and needs +// to re-evaluate the client instance subscription id as per changed subscriptions. +private final AtomicInteger subscriptionUpdateVersion; + +private ClientMetricsManager() { +this(Time.SYSTEM); +} + +// Visible for testing +ClientMetricsManager(Time time) { +this.subscriptionMap = new ConcurrentHashMap<>(); +this.subscriptionUpdateVersion = new AtomicInteger(0); +this.clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); +this.time = time; +} public void updateSubscription(String subscriptionName, Properties properties) { -// TODO: Implement the update logic to manage subscriptions. +// Validate the subscription properties. +ClientMetricsConfigs.validate(subscriptionName, properties); +// IncrementalAlterConfigs API will send empty configs when all the configs are deleted +// for respective subscription. In that case, we need to remove the subscription from the map. +if (properties.isEmpty()) { +// Remove the subscription from the map if it exists, else ignore the config update. +if (subscriptionMap.containsKey(subscriptionName)) { +log.info("Removing subscription [{}] from the subscription map", subscriptionName); +subscriptionMap.remove(subscriptionName); +this.subscriptionUpdateVersion.incrementAndGet(); +} +return; +} + +updateClientSubscription(subscriptionName, new ClientMetricsConfigs(properties)); +} + +public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( +GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +long now = time.milliseconds(); +Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) +.filter(id -> !id.equals(Uuid.ZERO_UUID)) +.orElse(generateNewClientId()); + +/* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issues another get + telemetry request prior to push interval, then the client should get a throttle error but if + the subscription has changed since the last request then the client should get the updated + subscription immediately. +*/ +ClientMetricsInstance clientInstance = clientInstance(clientInstanceId, requestContext); + +try { +// Validate the get request parameters for the client instance. +validateGetRequest(request, clientInstance, now); +} catch (ApiException exception) { +return request.getErrorResponse(throttleMs, exception); +} + +clientInstance.lastKnownError(Errors.NONE); +return createGetSubscriptionResponse(clientInstanceId, clientInstance, telemetryMaxBytes, throttleMs); +} + +public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, +int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +Uuid clientInstanceId = request.data().clientInstanceId(); +if (clientInstanceId == null || Uuid.RESERVED.contains(clientInstanceId)) { +String msg = String.format("Invalid request from the client [%s], invalid client instance id", +clientInstanceId); +return request.getErrorResponse(throttleMs, new InvalidRequestException(msg)); +} + +long now = time.milliseconds(); +ClientMetricsInstance clientInstance = clientInstance(clientInstanceId,
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1398479915 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -30,17 +69,376 @@ public class ClientMetricsManager implements Closeable { private static final Logger log = LoggerFactory.getLogger(ClientMetricsManager.class); private static final ClientMetricsManager INSTANCE = new ClientMetricsManager(); +private static final List SUPPORTED_COMPRESSION_TYPES = Collections.unmodifiableList( +Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, CompressionType.GZIP.id, CompressionType.SNAPPY.id)); public static ClientMetricsManager instance() { return INSTANCE; } +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; +private final Cache clientInstanceCache; +private final Map subscriptionMap; +private final Time time; + +// The latest subscription version is used to determine if subscription has changed and needs +// to re-evaluate the client instance subscription id as per changed subscriptions. +private final AtomicInteger subscriptionUpdateVersion; + +private ClientMetricsManager() { +this(Time.SYSTEM); +} + +// Visible for testing +ClientMetricsManager(Time time) { +this.subscriptionMap = new ConcurrentHashMap<>(); +this.subscriptionUpdateVersion = new AtomicInteger(0); +this.clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); +this.time = time; +} public void updateSubscription(String subscriptionName, Properties properties) { -// TODO: Implement the update logic to manage subscriptions. +// Validate the subscription properties. +ClientMetricsConfigs.validate(subscriptionName, properties); +// IncrementalAlterConfigs API will send empty configs when all the configs are deleted +// for respective subscription. In that case, we need to remove the subscription from the map. +if (properties.isEmpty()) { +// Remove the subscription from the map if it exists, else ignore the config update. +if (subscriptionMap.containsKey(subscriptionName)) { +log.info("Removing subscription [{}] from the subscription map", subscriptionName); +subscriptionMap.remove(subscriptionName); +this.subscriptionUpdateVersion.incrementAndGet(); +} +return; +} + +updateClientSubscription(subscriptionName, new ClientMetricsConfigs(properties)); +} + +public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( +GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +long now = time.milliseconds(); +Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) +.filter(id -> !id.equals(Uuid.ZERO_UUID)) +.orElse(generateNewClientId()); + +/* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issues another get + telemetry request prior to push interval, then the client should get a throttle error but if + the subscription has changed since the last request then the client should get the updated + subscription immediately. +*/ +ClientMetricsInstance clientInstance = clientInstance(clientInstanceId, requestContext); + +try { +// Validate the get request parameters for the client instance. +validateGetRequest(request, clientInstance, now); +} catch (ApiException exception) { +return request.getErrorResponse(throttleMs, exception); +} + +clientInstance.lastKnownError(Errors.NONE); +return createGetSubscriptionResponse(clientInstanceId, clientInstance, telemetryMaxBytes, throttleMs); +} + +public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, +int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +Uuid clientInstanceId = request.data().clientInstanceId(); +if (clientInstanceId == null || Uuid.RESERVED.contains(clientInstanceId)) { +String msg = String.format("Invalid request from the client [%s], invalid client instance id", +clientInstanceId); +return request.getErrorResponse(throttleMs, new InvalidRequestException(msg)); +} + +long now = time.milliseconds(); +ClientMetricsInstance clientInstance = clientInstance(clientInstanceId,
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1398479399 ## core/src/main/java/kafka/metrics/ClientMetricsInstance.java: ## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.protocol.Errors; + +import java.util.Objects; +import java.util.Set; + +/** + * Contains the metrics instance metadata and the state of the client instance. + */ +public class ClientMetricsInstance { + +private final Uuid clientInstanceId; +private final ClientMetricsInstanceMetadata instanceMetadata; +private final int subscriptionId; +private final int subscriptionVersion; +private final Set metrics; +private final int pushIntervalMs; + +private long lastGetRequestEpoch; Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1398479369 ## core/src/test/java/kafka/server/ClientMetricsManagerTest.java: ## @@ -0,0 +1,951 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server; + +import kafka.metrics.ClientMetricsConfigs; +import kafka.metrics.ClientMetricsInstance; +import kafka.metrics.ClientMetricsTestUtils; +import kafka.server.ClientMetricsManager.SubscriptionInfo; +import kafka.utils.TestUtils; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryRequest.Builder; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.utils.MockTime; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClientMetricsManagerTest { + +private static final Logger LOG = LoggerFactory.getLogger(ClientMetricsManagerTest.class); + +private Properties props; +private KafkaConfig config; +private MockTime time; +private ClientMetricsManager clientMetricsManager; + +@BeforeEach +public void setUp() { +props = TestUtils.createDummyBrokerConfig(); +props.setProperty(KafkaConfig.ClientTelemetryMaxBytesProp(), "100"); +config = new KafkaConfig(props); +time = new MockTime(); +clientMetricsManager = new ClientMetricsManager(config, time); +} + +@Test +public void testUpdateSubscription() { +assertTrue(clientMetricsManager.subscriptions().isEmpty()); + +assertEquals(0, clientMetricsManager.subscriptionUpdateVersion()); +clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties()); + +assertEquals(1, clientMetricsManager.subscriptions().size()); +assertNotNull(clientMetricsManager.subscriptionInfo("sub-1")); + +SubscriptionInfo subscriptionInfo = clientMetricsManager.subscriptionInfo("sub-1"); +Set metrics = subscriptionInfo.metrics(); + +// Validate metrics. +assertEquals(ClientMetricsTestUtils.DEFAULT_METRICS.split(",").length, metrics.size()); + Arrays.stream(ClientMetricsTestUtils.DEFAULT_METRICS.split(",")).forEach(metric -> +assertTrue(metrics.contains(metric))); +// Validate push interval. + assertEquals(ClientMetricsTestUtils.defaultProperties().getProperty(ClientMetricsConfigs.PUSH_INTERVAL_MS), +String.valueOf(subscriptionInfo.intervalMs())); + +// Validate match patterns. + assertEquals(ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.size(), +subscriptionInfo.matchPattern().size()); +ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.forEach(pattern -> { +String[] split = pattern.split("="); +assertTrue(subscriptionInfo.matchPattern().containsKey(split[0])); +assertEquals(split[1],
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1398479353 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -16,31 +16,421 @@ */ package kafka.server; +import kafka.metrics.ClientMetricsConfigs; +import kafka.metrics.ClientMetricsInstance; +import kafka.metrics.ClientMetricsInstanceMetadata; +import kafka.metrics.ClientMetricsReceiverPlugin; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; +import org.apache.kafka.common.cache.SynchronizedCache; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.TelemetryTooLargeException; +import org.apache.kafka.common.errors.ThrottlingQuotaExceededException; +import org.apache.kafka.common.errors.UnknownSubscriptionIdException; +import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.Crc32C; +import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; /** * Handles client telemetry metrics requests/responses, subscriptions and instance information. */ public class ClientMetricsManager implements Closeable { private static final Logger log = LoggerFactory.getLogger(ClientMetricsManager.class); -private static final ClientMetricsManager INSTANCE = new ClientMetricsManager(); +private static final List SUPPORTED_COMPRESSION_TYPES = Collections.unmodifiableList( +Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, CompressionType.GZIP.id, CompressionType.SNAPPY.id)); +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; + +private final Cache clientInstanceCache; +private final Map subscriptionMap; +private final KafkaConfig config; +private final Time time; + +// The latest subscription version is used to determine if subscription has changed and needs +// to re-evaluate the client instance subscription id as per changed subscriptions. +private final AtomicInteger subscriptionUpdateVersion; -public static ClientMetricsManager instance() { -return INSTANCE; +public ClientMetricsManager(KafkaConfig config, Time time) { +this.subscriptionMap = new ConcurrentHashMap<>(); +this.subscriptionUpdateVersion = new AtomicInteger(0); +this.clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); +this.config = config; +this.time = time; } public void updateSubscription(String subscriptionName, Properties properties) { -// TODO: Implement the update logic to manage subscriptions. +// Validate the subscription properties. +ClientMetricsConfigs.validate(subscriptionName, properties); +// IncrementalAlterConfigs API will send empty configs when all the configs are deleted +// for respective subscription. In that case, we need to remove the subscription from the map. +if (properties.isEmpty()) { +// Remove the subscription from the map if it exists, else ignore the config update. +if (subscriptionMap.containsKey(subscriptionName)) { +log.info("Removing subscription [{}] from the subscription map", subscriptionName); +subscriptionMap.remove(subscriptionName); +this.subscriptionUpdateVersion.incrementAndGet(); +} +return; +} + +updateClientSubscription(subscriptionName, new ClientMetricsConfigs(properties)); +} + +public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( +
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1398479319 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -16,31 +16,421 @@ */ package kafka.server; +import kafka.metrics.ClientMetricsConfigs; +import kafka.metrics.ClientMetricsInstance; +import kafka.metrics.ClientMetricsInstanceMetadata; +import kafka.metrics.ClientMetricsReceiverPlugin; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; +import org.apache.kafka.common.cache.SynchronizedCache; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.TelemetryTooLargeException; +import org.apache.kafka.common.errors.ThrottlingQuotaExceededException; +import org.apache.kafka.common.errors.UnknownSubscriptionIdException; +import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.Crc32C; +import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; /** * Handles client telemetry metrics requests/responses, subscriptions and instance information. */ public class ClientMetricsManager implements Closeable { private static final Logger log = LoggerFactory.getLogger(ClientMetricsManager.class); -private static final ClientMetricsManager INSTANCE = new ClientMetricsManager(); +private static final List SUPPORTED_COMPRESSION_TYPES = Collections.unmodifiableList( +Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, CompressionType.GZIP.id, CompressionType.SNAPPY.id)); +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; + +private final Cache clientInstanceCache; +private final Map subscriptionMap; +private final KafkaConfig config; +private final Time time; + +// The latest subscription version is used to determine if subscription has changed and needs +// to re-evaluate the client instance subscription id as per changed subscriptions. +private final AtomicInteger subscriptionUpdateVersion; -public static ClientMetricsManager instance() { -return INSTANCE; +public ClientMetricsManager(KafkaConfig config, Time time) { +this.subscriptionMap = new ConcurrentHashMap<>(); +this.subscriptionUpdateVersion = new AtomicInteger(0); +this.clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); +this.config = config; +this.time = time; } public void updateSubscription(String subscriptionName, Properties properties) { -// TODO: Implement the update logic to manage subscriptions. +// Validate the subscription properties. +ClientMetricsConfigs.validate(subscriptionName, properties); +// IncrementalAlterConfigs API will send empty configs when all the configs are deleted +// for respective subscription. In that case, we need to remove the subscription from the map. +if (properties.isEmpty()) { +// Remove the subscription from the map if it exists, else ignore the config update. +if (subscriptionMap.containsKey(subscriptionName)) { +log.info("Removing subscription [{}] from the subscription map", subscriptionName); +subscriptionMap.remove(subscriptionName); +this.subscriptionUpdateVersion.incrementAndGet(); +} +return; +} + +updateClientSubscription(subscriptionName, new ClientMetricsConfigs(properties)); +} + +public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( +
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1398477241 ## core/src/main/java/kafka/metrics/ClientMetricsReceiverPlugin.java: ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics; + +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.server.telemetry.ClientTelemetryReceiver; + +import java.util.ArrayList; +import java.util.List; + +/** + * Plugin to register client telemetry receivers and export metrics. This class is used by the Kafka + * server to export client metrics to the registered receivers. + */ +public class ClientMetricsReceiverPlugin { + +private static final ClientMetricsReceiverPlugin INSTANCE = new ClientMetricsReceiverPlugin(); Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
junrao commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1397751986 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -30,17 +69,376 @@ public class ClientMetricsManager implements Closeable { private static final Logger log = LoggerFactory.getLogger(ClientMetricsManager.class); private static final ClientMetricsManager INSTANCE = new ClientMetricsManager(); +private static final List SUPPORTED_COMPRESSION_TYPES = Collections.unmodifiableList( +Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, CompressionType.GZIP.id, CompressionType.SNAPPY.id)); public static ClientMetricsManager instance() { return INSTANCE; } +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; +private final Cache clientInstanceCache; +private final Map subscriptionMap; +private final Time time; + +// The latest subscription version is used to determine if subscription has changed and needs +// to re-evaluate the client instance subscription id as per changed subscriptions. +private final AtomicInteger subscriptionUpdateVersion; + +private ClientMetricsManager() { +this(Time.SYSTEM); +} + +// Visible for testing +ClientMetricsManager(Time time) { +this.subscriptionMap = new ConcurrentHashMap<>(); +this.subscriptionUpdateVersion = new AtomicInteger(0); +this.clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); +this.time = time; +} public void updateSubscription(String subscriptionName, Properties properties) { -// TODO: Implement the update logic to manage subscriptions. +// Validate the subscription properties. +ClientMetricsConfigs.validate(subscriptionName, properties); +// IncrementalAlterConfigs API will send empty configs when all the configs are deleted +// for respective subscription. In that case, we need to remove the subscription from the map. +if (properties.isEmpty()) { +// Remove the subscription from the map if it exists, else ignore the config update. +if (subscriptionMap.containsKey(subscriptionName)) { +log.info("Removing subscription [{}] from the subscription map", subscriptionName); +subscriptionMap.remove(subscriptionName); +this.subscriptionUpdateVersion.incrementAndGet(); +} +return; +} + +updateClientSubscription(subscriptionName, new ClientMetricsConfigs(properties)); +} + +public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( +GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +long now = time.milliseconds(); +Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) +.filter(id -> !id.equals(Uuid.ZERO_UUID)) +.orElse(generateNewClientId()); + +/* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issues another get + telemetry request prior to push interval, then the client should get a throttle error but if + the subscription has changed since the last request then the client should get the updated + subscription immediately. +*/ +ClientMetricsInstance clientInstance = clientInstance(clientInstanceId, requestContext); + +try { +// Validate the get request parameters for the client instance. +validateGetRequest(request, clientInstance, now); +} catch (ApiException exception) { +return request.getErrorResponse(throttleMs, exception); +} + +clientInstance.lastKnownError(Errors.NONE); +return createGetSubscriptionResponse(clientInstanceId, clientInstance, telemetryMaxBytes, throttleMs); +} + +public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, +int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +Uuid clientInstanceId = request.data().clientInstanceId(); +if (clientInstanceId == null || Uuid.RESERVED.contains(clientInstanceId)) { +String msg = String.format("Invalid request from the client [%s], invalid client instance id", +clientInstanceId); +return request.getErrorResponse(throttleMs, new InvalidRequestException(msg)); +} + +long now = time.milliseconds(); +ClientMetricsInstance clientInstance = clientInstance(clientInstanceId,
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on PR #14699: URL: https://github.com/apache/kafka/pull/14699#issuecomment-1817091571 Thanks @junrao for leaving the comments, I have tried to address them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1397875576 ## core/src/test/java/kafka/metrics/ClientMetricsInstanceTest.java: ## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics; + +import org.apache.kafka.common.Uuid; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.net.UnknownHostException; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClientMetricsInstanceTest { + +private Uuid uuid; +private ClientMetricsInstanceMetadata instanceMetadata; +private ClientMetricsInstance clientInstance; + +@BeforeEach +public void setUp() throws UnknownHostException { +uuid = Uuid.randomUuid(); +instanceMetadata = new ClientMetricsInstanceMetadata(uuid, +ClientMetricsTestUtils.requestContext()); +clientInstance = new ClientMetricsInstance(Uuid.randomUuid(), instanceMetadata, 0, 0, +null, ClientMetricsConfigs.DEFAULT_INTERVAL_MS); +} + +@Test +public void testMaybeUpdateRequestEpochValid() { +// First request should be accepted. + assertTrue(clientInstance.maybeUpdateGetRequestEpoch(System.currentTimeMillis())); Review Comment: As the class/methods I am testing do not have `Time` reference rather just tries to updates the epoch supplied hence I didn't see any value in using MockTime. However I did change the code and tests for ClientMetricsManager where these methods are invoked from, I started using Time in ClientMetricsManager and corresponding MockTime in ClientMetricsManagerTest which eliminated the use of `Thread.sleep`. Please let me know if I am missing anything here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1397873419 ## core/src/main/java/kafka/metrics/ClientMetricsReceiverPlugin.java: ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics; + +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.server.telemetry.ClientTelemetryReceiver; + +import java.util.ArrayList; +import java.util.List; + +/** + * Plugin to register client telemetry receivers and export metrics. This class is used by the Kafka + * server to export client metrics to the registered receivers. + */ +public class ClientMetricsReceiverPlugin { + +private static final ClientMetricsReceiverPlugin INSTANCE = new ClientMetricsReceiverPlugin(); Review Comment: I have moved the `ClientMetricsManager` static instance and removed the code from ControllerServer.scala, now initialization only happens in `BrokerServer.scala`. Please let me know your thought for `ClientMetricsReceiverPlugin`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1397871398 ## clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java: ## @@ -60,17 +62,47 @@ public PushTelemetryRequest(PushTelemetryRequestData data, short version) { @Override public PushTelemetryResponse getErrorResponse(int throttleTimeMs, Throwable e) { -PushTelemetryResponseData responseData = new PushTelemetryResponseData() -.setErrorCode(Errors.forException(e).code()) -.setThrottleTimeMs(throttleTimeMs); -return new PushTelemetryResponse(responseData); +return errorResponse(throttleTimeMs, Errors.forException(e)); } @Override public PushTelemetryRequestData data() { return data; } +public PushTelemetryResponse errorResponse(int throttleTimeMs, Errors errors) { +PushTelemetryResponseData responseData = new PushTelemetryResponseData(); +responseData.setErrorCode(errors.code()); +/* + THROTTLING_QUOTA_EXCEEDED is thrown in telemetry APIs when the telemetry request + arrives prior the configured minimum interval between two consecutive telemetry requests. + In this case, the throttleTimeMs should not be included in the response to avoid the + muting of the client channel for all the other requests. +*/ +if (Errors.THROTTLING_QUOTA_EXCEEDED != errors) { Review Comment: I have removed the conditional llogic in this class and set `0` as the throttle time in case of error in ClientMetricsManager. Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
junrao commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1397714746 ## clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java: ## @@ -60,17 +62,47 @@ public PushTelemetryRequest(PushTelemetryRequestData data, short version) { @Override public PushTelemetryResponse getErrorResponse(int throttleTimeMs, Throwable e) { -PushTelemetryResponseData responseData = new PushTelemetryResponseData() -.setErrorCode(Errors.forException(e).code()) -.setThrottleTimeMs(throttleTimeMs); -return new PushTelemetryResponse(responseData); +return errorResponse(throttleTimeMs, Errors.forException(e)); } @Override public PushTelemetryRequestData data() { return data; } +public PushTelemetryResponse errorResponse(int throttleTimeMs, Errors errors) { +PushTelemetryResponseData responseData = new PushTelemetryResponseData(); +responseData.setErrorCode(errors.code()); +/* + THROTTLING_QUOTA_EXCEEDED is thrown in telemetry APIs when the telemetry request + arrives prior the configured minimum interval between two consecutive telemetry requests. + In this case, the throttleTimeMs should not be included in the response to avoid the + muting of the client channel for all the other requests. +*/ +if (Errors.THROTTLING_QUOTA_EXCEEDED != errors) { Review Comment: I noticed that sometimes we do set `throttleTimeMs` with an error. However, this only happens in KafkaApis with the generic request throttling logic. So, for now, I'd recommend that we get rid of this check here and rely on the caller to set `throttleTimeMs` properly. Specifically, for all `errorResponse(int throttleTimeMs, Errors errors)` and `getErrorResponse(int throttleTimeMs, Throwable e)` calls in `ClientMetricsManager`, we should pass in 0 for `throttleTimeMs`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
junrao commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1397714746 ## clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java: ## @@ -60,17 +62,47 @@ public PushTelemetryRequest(PushTelemetryRequestData data, short version) { @Override public PushTelemetryResponse getErrorResponse(int throttleTimeMs, Throwable e) { -PushTelemetryResponseData responseData = new PushTelemetryResponseData() -.setErrorCode(Errors.forException(e).code()) -.setThrottleTimeMs(throttleTimeMs); -return new PushTelemetryResponse(responseData); +return errorResponse(throttleTimeMs, Errors.forException(e)); } @Override public PushTelemetryRequestData data() { return data; } +public PushTelemetryResponse errorResponse(int throttleTimeMs, Errors errors) { +PushTelemetryResponseData responseData = new PushTelemetryResponseData(); +responseData.setErrorCode(errors.code()); +/* + THROTTLING_QUOTA_EXCEEDED is thrown in telemetry APIs when the telemetry request + arrives prior the configured minimum interval between two consecutive telemetry requests. + In this case, the throttleTimeMs should not be included in the response to avoid the + muting of the client channel for all the other requests. +*/ +if (Errors.THROTTLING_QUOTA_EXCEEDED != errors) { Review Comment: I noticed that sometimes we do set `throttleTimeMs` with an error. However, this only happens in KafkaApis with the generic request throttling logic. So, for now, I'd recommend that we get rid of this check here and rely on the caller to set `throttleTimeMs` properly. Specifically, for all `errorResponse(int throttleTimeMs, Errors errors) `and `getErrorResponse(int throttleTimeMs, Throwable e)` calls in `ClientMetricsManager`, we should pass in 0 for `throttleTimeMs`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
junrao commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1397714746 ## clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java: ## @@ -60,17 +62,47 @@ public PushTelemetryRequest(PushTelemetryRequestData data, short version) { @Override public PushTelemetryResponse getErrorResponse(int throttleTimeMs, Throwable e) { -PushTelemetryResponseData responseData = new PushTelemetryResponseData() -.setErrorCode(Errors.forException(e).code()) -.setThrottleTimeMs(throttleTimeMs); -return new PushTelemetryResponse(responseData); +return errorResponse(throttleTimeMs, Errors.forException(e)); } @Override public PushTelemetryRequestData data() { return data; } +public PushTelemetryResponse errorResponse(int throttleTimeMs, Errors errors) { +PushTelemetryResponseData responseData = new PushTelemetryResponseData(); +responseData.setErrorCode(errors.code()); +/* + THROTTLING_QUOTA_EXCEEDED is thrown in telemetry APIs when the telemetry request + arrives prior the configured minimum interval between two consecutive telemetry requests. + In this case, the throttleTimeMs should not be included in the response to avoid the + muting of the client channel for all the other requests. +*/ +if (Errors.THROTTLING_QUOTA_EXCEEDED != errors) { Review Comment: As discussed earlier, for `THROTTLING_QUOTA_EXCEEDED`, we shouldn't set `throttleTimeMs` since we don't want the client to mute the channel for all requests. Also, I noticed that sometimes we do set `throttleTimeMs` with an error. However, this only happens in KafkaApis with the generic request throttling logic. So, for now, I'd recommend that we get rid of this check here and rely on the caller to set `throttleTimeMs` properly. For example, for all `errorResponse(int throttleTimeMs, Errors errors) `and `getErrorResponse(int throttleTimeMs, Throwable e)` calls in `ClientMetricsManager`, we should pass in 0 for `throttleTimeMs`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1397795152 ## core/src/main/java/kafka/metrics/ClientMetricsReceiverPlugin.java: ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics; + +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.server.telemetry.ClientTelemetryReceiver; + +import java.util.ArrayList; +import java.util.List; + +/** + * Plugin to register client telemetry receivers and export metrics. This class is used by the Kafka + * server to export client metrics to the registered receivers. + */ +public class ClientMetricsReceiverPlugin { + +private static final ClientMetricsReceiverPlugin INSTANCE = new ClientMetricsReceiverPlugin(); Review Comment: Yeah, I re-checked the code today and we shouldn't require any change in ControllerServer.scala as the APIs are not supported on controller and we won't require even KIP-1000 to be supported on controllers. I am making the change to initialize ClientMetricsManager only from KafkaServer.scala What do you think about `ClientMetricsReceiverPlugin` based on the usage in the tagged PR above? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
junrao commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1397714746 ## clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java: ## @@ -60,17 +62,47 @@ public PushTelemetryRequest(PushTelemetryRequestData data, short version) { @Override public PushTelemetryResponse getErrorResponse(int throttleTimeMs, Throwable e) { -PushTelemetryResponseData responseData = new PushTelemetryResponseData() -.setErrorCode(Errors.forException(e).code()) -.setThrottleTimeMs(throttleTimeMs); -return new PushTelemetryResponse(responseData); +return errorResponse(throttleTimeMs, Errors.forException(e)); } @Override public PushTelemetryRequestData data() { return data; } +public PushTelemetryResponse errorResponse(int throttleTimeMs, Errors errors) { +PushTelemetryResponseData responseData = new PushTelemetryResponseData(); +responseData.setErrorCode(errors.code()); +/* + THROTTLING_QUOTA_EXCEEDED is thrown in telemetry APIs when the telemetry request + arrives prior the configured minimum interval between two consecutive telemetry requests. + In this case, the throttleTimeMs should not be included in the response to avoid the + muting of the client channel for all the other requests. +*/ +if (Errors.THROTTLING_QUOTA_EXCEEDED != errors) { Review Comment: We shouldn't set throttleTimeMs even for Errors.THROTTLING_QUOTA_EXCEEDED. throttleTimeMs can only be set when there is no error. ## core/src/main/java/kafka/metrics/ClientMetricsReceiverPlugin.java: ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics; + +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.server.telemetry.ClientTelemetryReceiver; + +import java.util.ArrayList; +import java.util.List; + +/** + * Plugin to register client telemetry receivers and export metrics. This class is used by the Kafka + * server to export client metrics to the registered receivers. + */ +public class ClientMetricsReceiverPlugin { + +private static final ClientMetricsReceiverPlugin INSTANCE = new ClientMetricsReceiverPlugin(); Review Comment: Hmm, why is ClientMetricsManager needed in the controller? I thought only brokers can receive get/push telemetric requests. ## core/src/test/java/kafka/metrics/ClientMetricsInstanceTest.java: ## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics; + +import org.apache.kafka.common.Uuid; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.net.UnknownHostException; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClientMetricsInstanceTest { + +private Uuid uuid; +private ClientMetricsInstanceMetadata instanceMetadata; +private ClientMetricsInstance clientInstance; + +@BeforeEach +public void setUp() throws UnknownHostException { +uuid = Uuid.randomUuid(); +instanceMetadata = new ClientMetricsInstanceMetadata(uuid, +ClientMetricsTestUtils.requestContext()); +clientInstance = new
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
junrao commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1396501916 ## core/src/test/java/kafka/metrics/ClientMetricsInstanceTest.java: ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics; + +import org.apache.kafka.common.Uuid; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.net.UnknownHostException; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClientMetricsInstanceTest { + +private Uuid uuid; +private ClientMetricsInstanceMetadata instanceMetadata; +private ClientMetricsInstance clientInstance; + +@BeforeEach +public void setUp() throws UnknownHostException { +uuid = Uuid.randomUuid(); +instanceMetadata = new ClientMetricsInstanceMetadata(uuid, +ClientMetricsTestUtils.requestContext()); Review Comment: Yes, I just meant merging line 37 and 38. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
junrao commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1396500493 ## clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java: ## @@ -60,17 +62,39 @@ public PushTelemetryRequest(PushTelemetryRequestData data, short version) { @Override public PushTelemetryResponse getErrorResponse(int throttleTimeMs, Throwable e) { -PushTelemetryResponseData responseData = new PushTelemetryResponseData() -.setErrorCode(Errors.forException(e).code()) -.setThrottleTimeMs(throttleTimeMs); -return new PushTelemetryResponse(responseData); +return errorResponse(throttleTimeMs, Errors.forException(e)); Review Comment: Got it. Since getErrorResponse is an override of a base method, we can leave it as it is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on PR #14699: URL: https://github.com/apache/kafka/pull/14699#issuecomment-1815015828 Thanks for reviewing @junrao, I have updated the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1396171173 ## core/src/test/java/kafka/metrics/ClientMetricsInstanceTest.java: ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics; + +import org.apache.kafka.common.Uuid; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.net.UnknownHostException; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClientMetricsInstanceTest { + +private Uuid uuid; +private ClientMetricsInstanceMetadata instanceMetadata; +private ClientMetricsInstance clientInstance; + +@BeforeEach +public void setUp() throws UnknownHostException { +uuid = Uuid.randomUuid(); +instanceMetadata = new ClientMetricsInstanceMetadata(uuid, +ClientMetricsTestUtils.requestContext()); +clientInstance = new ClientMetricsInstance(Uuid.randomUuid(), instanceMetadata, 0, 0, +null, ClientMetricsConfigs.DEFAULT_INTERVAL_MS); +} + +@Test +public void testMaybeUpdateRequestEpochValid() { +// First request should be accepted. + assertTrue(clientInstance.maybeUpdateGetRequestEpoch(System.currentTimeMillis())); + assertTrue(clientInstance.maybeUpdatePushRequestEpoch(System.currentTimeMillis())); +} + +@Test +public void testMaybeUpdateGetRequestAfterElapsedTimeValid() throws InterruptedException { +ClientMetricsInstance clientInstance = new ClientMetricsInstance(uuid, instanceMetadata, 0, 0, +null, 2); + assertTrue(clientInstance.maybeUpdateGetRequestEpoch(System.currentTimeMillis())); +// sleep for 3 ms to ensure that the next request is accepted. +Thread.sleep(3); +// Second request should be accepted as time since last request is greater than the retry interval. Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1396170895 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -30,17 +68,366 @@ public class ClientMetricsManager implements Closeable { private static final Logger log = LoggerFactory.getLogger(ClientMetricsManager.class); private static final ClientMetricsManager INSTANCE = new ClientMetricsManager(); +private static final List SUPPORTED_COMPRESSION_TYPES = Collections.unmodifiableList( +Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, CompressionType.GZIP.id, CompressionType.SNAPPY.id)); public static ClientMetricsManager instance() { return INSTANCE; } +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; +private final Cache clientInstanceCache; +private final Map subscriptionMap; + +// The last subscription updated time is used to determine if the next telemetry request needs +// to re-evaluate the subscription id as per changes subscriptions. +private volatile long lastSubscriptionUpdateEpoch; + +// Visible for testing +ClientMetricsManager() { +subscriptionMap = new ConcurrentHashMap<>(); +clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); +} public void updateSubscription(String subscriptionName, Properties properties) { -// TODO: Implement the update logic to manage subscriptions. +// IncrementalAlterConfigs API will send empty configs when all the configs are deleted +// for respective subscription. In that case, we need to remove the subscription from the map. +if (properties.isEmpty()) { +// Remove the subscription from the map if it exists, else ignore the config update. +if (subscriptionMap.containsKey(subscriptionName)) { +log.info("Removing subscription [{}] from the subscription map", subscriptionName); +subscriptionMap.remove(subscriptionName); +updateLastSubscriptionUpdateEpoch(); +} +return; +} + +ClientMetricsConfigs configs = new ClientMetricsConfigs(properties); +updateClientSubscription(subscriptionName, configs); +/* + Update last subscription updated time to current time to indicate that there is a change + in the subscription. This will be used to determine if the next telemetry request needs + to re-evaluate the subscription id as per changes subscriptions. +*/ +updateLastSubscriptionUpdateEpoch(); +} + +public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( +GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +long now = System.currentTimeMillis(); +Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) +.filter(id -> !id.equals(Uuid.ZERO_UUID)) +.orElse(generateNewClientId()); + +/* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issued get telemetry + request prior to push interval, then the client should get a throttle error but if the + subscription has changed since the last request then the client should get the updated + subscription immediately. +*/ +ClientMetricsInstance clientInstance = clientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the get request parameters for the client instance. +validateGetRequest(request, clientInstance, now); +} catch (ApiException exception) { +return request.getErrorResponse(throttleMs, exception); +} + +clientInstance.lastKnownError(Errors.NONE); +return createGetSubscriptionResponse(clientInstanceId, clientInstance, telemetryMaxBytes, throttleMs); +} + +public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, +int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +Uuid clientInstanceId = request.data().clientInstanceId(); +if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) { Review Comment: I have moved the check to Uuid.RESERVED, done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1396168530 ## core/src/test/java/kafka/server/ClientMetricsManagerTest.java: ## @@ -0,0 +1,921 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server; + +import kafka.metrics.ClientMetricsConfigs; +import kafka.metrics.ClientMetricsInstance; +import kafka.metrics.ClientMetricsTestUtils; +import kafka.server.ClientMetricsManager.SubscriptionInfo; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryRequest.Builder; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClientMetricsManagerTest { + +private ClientMetricsManager clientMetricsManager; + +@BeforeEach +public void setUp() { +clientMetricsManager = new ClientMetricsManager(); +} + +@Test +public void testUpdateSubscription() { +assertTrue(clientMetricsManager.subscriptions().isEmpty()); + +long previousEpoch = clientMetricsManager.lastSubscriptionUpdateEpoch(); +clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties()); + +assertEquals(1, clientMetricsManager.subscriptions().size()); +assertNotNull(clientMetricsManager.subscriptionInfo("sub-1")); + +SubscriptionInfo subscriptionInfo = clientMetricsManager.subscriptionInfo("sub-1"); +Set metrics = subscriptionInfo.metrics(); + +// Validate metrics. +assertEquals(ClientMetricsTestUtils.DEFAULT_METRICS.split(",").length, metrics.size()); + Arrays.stream(ClientMetricsTestUtils.DEFAULT_METRICS.split(",")).forEach(metric -> +assertTrue(metrics.contains(metric))); +// Validate push interval. + assertEquals(ClientMetricsTestUtils.defaultProperties().getProperty(ClientMetricsConfigs.PUSH_INTERVAL_MS), +String.valueOf(subscriptionInfo.intervalMs())); + +// Validate match patterns. + assertEquals(ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.size(), +subscriptionInfo.matchPattern().size()); +ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.forEach(pattern -> { +String[] split = pattern.split("="); +assertTrue(subscriptionInfo.matchPattern().containsKey(split[0])); +assertEquals(split[1], subscriptionInfo.matchPattern().get(split[0]).pattern()); +}); +assertTrue(clientMetricsManager.lastSubscriptionUpdateEpoch() > previousEpoch); +} + +@Test +public void testUpdateSubscriptionWithEmptyProperties() { +assertTrue(clientMetricsManager.subscriptions().isEmpty()); +long previousEpoch = clientMetricsManager.lastSubscriptionUpdateEpoch(); +clientMetricsManager.updateSubscription("sub-1", new Properties()); +// No subscription should be added as the properties are empty. +assertEquals(0, clientMetricsManager.subscriptions().size()); +
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1396168208 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -30,17 +68,366 @@ public class ClientMetricsManager implements Closeable { private static final Logger log = LoggerFactory.getLogger(ClientMetricsManager.class); private static final ClientMetricsManager INSTANCE = new ClientMetricsManager(); +private static final List SUPPORTED_COMPRESSION_TYPES = Collections.unmodifiableList( +Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, CompressionType.GZIP.id, CompressionType.SNAPPY.id)); public static ClientMetricsManager instance() { return INSTANCE; } +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; +private final Cache clientInstanceCache; +private final Map subscriptionMap; + +// The last subscription updated time is used to determine if the next telemetry request needs +// to re-evaluate the subscription id as per changes subscriptions. +private volatile long lastSubscriptionUpdateEpoch; + +// Visible for testing +ClientMetricsManager() { +subscriptionMap = new ConcurrentHashMap<>(); +clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); +} public void updateSubscription(String subscriptionName, Properties properties) { -// TODO: Implement the update logic to manage subscriptions. +// IncrementalAlterConfigs API will send empty configs when all the configs are deleted +// for respective subscription. In that case, we need to remove the subscription from the map. +if (properties.isEmpty()) { +// Remove the subscription from the map if it exists, else ignore the config update. +if (subscriptionMap.containsKey(subscriptionName)) { +log.info("Removing subscription [{}] from the subscription map", subscriptionName); +subscriptionMap.remove(subscriptionName); +updateLastSubscriptionUpdateEpoch(); +} +return; +} + +ClientMetricsConfigs configs = new ClientMetricsConfigs(properties); +updateClientSubscription(subscriptionName, configs); +/* + Update last subscription updated time to current time to indicate that there is a change + in the subscription. This will be used to determine if the next telemetry request needs + to re-evaluate the subscription id as per changes subscriptions. +*/ +updateLastSubscriptionUpdateEpoch(); +} + +public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( +GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +long now = System.currentTimeMillis(); +Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) +.filter(id -> !id.equals(Uuid.ZERO_UUID)) +.orElse(generateNewClientId()); + +/* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issued get telemetry + request prior to push interval, then the client should get a throttle error but if the + subscription has changed since the last request then the client should get the updated + subscription immediately. +*/ +ClientMetricsInstance clientInstance = clientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the get request parameters for the client instance. +validateGetRequest(request, clientInstance, now); +} catch (ApiException exception) { +return request.getErrorResponse(throttleMs, exception); +} + +clientInstance.lastKnownError(Errors.NONE); +return createGetSubscriptionResponse(clientInstanceId, clientInstance, telemetryMaxBytes, throttleMs); +} + +public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, +int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +Uuid clientInstanceId = request.data().clientInstanceId(); +if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) { +String msg = String.format("Invalid request from the client [%s], invalid client instance id", +clientInstanceId); +return request.getErrorResponse(throttleMs, new InvalidRequestException(msg)); +} + +long now = System.currentTimeMillis(); +ClientMetricsInstance clientInstance =
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1396167813 ## core/src/test/java/kafka/server/ClientMetricsManagerTest.java: ## @@ -0,0 +1,921 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server; + +import kafka.metrics.ClientMetricsConfigs; +import kafka.metrics.ClientMetricsInstance; +import kafka.metrics.ClientMetricsTestUtils; +import kafka.server.ClientMetricsManager.SubscriptionInfo; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryRequest.Builder; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClientMetricsManagerTest { + +private ClientMetricsManager clientMetricsManager; + +@BeforeEach +public void setUp() { +clientMetricsManager = new ClientMetricsManager(); +} + +@Test +public void testUpdateSubscription() { +assertTrue(clientMetricsManager.subscriptions().isEmpty()); + +long previousEpoch = clientMetricsManager.lastSubscriptionUpdateEpoch(); +clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties()); + +assertEquals(1, clientMetricsManager.subscriptions().size()); +assertNotNull(clientMetricsManager.subscriptionInfo("sub-1")); + +SubscriptionInfo subscriptionInfo = clientMetricsManager.subscriptionInfo("sub-1"); +Set metrics = subscriptionInfo.metrics(); + +// Validate metrics. +assertEquals(ClientMetricsTestUtils.DEFAULT_METRICS.split(",").length, metrics.size()); + Arrays.stream(ClientMetricsTestUtils.DEFAULT_METRICS.split(",")).forEach(metric -> +assertTrue(metrics.contains(metric))); +// Validate push interval. + assertEquals(ClientMetricsTestUtils.defaultProperties().getProperty(ClientMetricsConfigs.PUSH_INTERVAL_MS), +String.valueOf(subscriptionInfo.intervalMs())); + +// Validate match patterns. + assertEquals(ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.size(), +subscriptionInfo.matchPattern().size()); +ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.forEach(pattern -> { +String[] split = pattern.split("="); +assertTrue(subscriptionInfo.matchPattern().containsKey(split[0])); +assertEquals(split[1], subscriptionInfo.matchPattern().get(split[0]).pattern()); +}); +assertTrue(clientMetricsManager.lastSubscriptionUpdateEpoch() > previousEpoch); +} + +@Test +public void testUpdateSubscriptionWithEmptyProperties() { +assertTrue(clientMetricsManager.subscriptions().isEmpty()); +long previousEpoch = clientMetricsManager.lastSubscriptionUpdateEpoch(); +clientMetricsManager.updateSubscription("sub-1", new Properties()); +// No subscription should be added as the properties are empty. +assertEquals(0, clientMetricsManager.subscriptions().size()); +
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1396165846 ## core/src/test/java/kafka/server/ClientMetricsManagerTest.java: ## @@ -0,0 +1,921 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server; + +import kafka.metrics.ClientMetricsConfigs; +import kafka.metrics.ClientMetricsInstance; +import kafka.metrics.ClientMetricsTestUtils; +import kafka.server.ClientMetricsManager.SubscriptionInfo; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryRequest.Builder; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClientMetricsManagerTest { + +private ClientMetricsManager clientMetricsManager; + +@BeforeEach +public void setUp() { +clientMetricsManager = new ClientMetricsManager(); +} + +@Test +public void testUpdateSubscription() { +assertTrue(clientMetricsManager.subscriptions().isEmpty()); + +long previousEpoch = clientMetricsManager.lastSubscriptionUpdateEpoch(); +clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties()); + +assertEquals(1, clientMetricsManager.subscriptions().size()); +assertNotNull(clientMetricsManager.subscriptionInfo("sub-1")); + +SubscriptionInfo subscriptionInfo = clientMetricsManager.subscriptionInfo("sub-1"); +Set metrics = subscriptionInfo.metrics(); + +// Validate metrics. +assertEquals(ClientMetricsTestUtils.DEFAULT_METRICS.split(",").length, metrics.size()); + Arrays.stream(ClientMetricsTestUtils.DEFAULT_METRICS.split(",")).forEach(metric -> +assertTrue(metrics.contains(metric))); +// Validate push interval. + assertEquals(ClientMetricsTestUtils.defaultProperties().getProperty(ClientMetricsConfigs.PUSH_INTERVAL_MS), +String.valueOf(subscriptionInfo.intervalMs())); + +// Validate match patterns. + assertEquals(ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.size(), +subscriptionInfo.matchPattern().size()); +ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.forEach(pattern -> { +String[] split = pattern.split("="); +assertTrue(subscriptionInfo.matchPattern().containsKey(split[0])); +assertEquals(split[1], subscriptionInfo.matchPattern().get(split[0]).pattern()); +}); +assertTrue(clientMetricsManager.lastSubscriptionUpdateEpoch() > previousEpoch); +} + +@Test +public void testUpdateSubscriptionWithEmptyProperties() { +assertTrue(clientMetricsManager.subscriptions().isEmpty()); +long previousEpoch = clientMetricsManager.lastSubscriptionUpdateEpoch(); +clientMetricsManager.updateSubscription("sub-1", new Properties()); +// No subscription should be added as the properties are empty. +assertEquals(0, clientMetricsManager.subscriptions().size()); +
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1396164589 ## core/src/test/java/kafka/metrics/ClientMetricsInstanceTest.java: ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics; + +import org.apache.kafka.common.Uuid; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.net.UnknownHostException; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClientMetricsInstanceTest { + +private Uuid uuid; +private ClientMetricsInstanceMetadata instanceMetadata; +private ClientMetricsInstance clientInstance; + +@BeforeEach +public void setUp() throws UnknownHostException { +uuid = Uuid.randomUuid(); +instanceMetadata = new ClientMetricsInstanceMetadata(uuid, +ClientMetricsTestUtils.requestContext()); +clientInstance = new ClientMetricsInstance(Uuid.randomUuid(), instanceMetadata, 0, 0, +null, ClientMetricsConfigs.DEFAULT_INTERVAL_MS); +} + +@Test +public void testMaybeUpdateRequestEpochValid() { +// First request should be accepted. + assertTrue(clientInstance.maybeUpdateGetRequestEpoch(System.currentTimeMillis())); + assertTrue(clientInstance.maybeUpdatePushRequestEpoch(System.currentTimeMillis())); +} + +@Test +public void testMaybeUpdateGetRequestAfterElapsedTimeValid() throws InterruptedException { +ClientMetricsInstance clientInstance = new ClientMetricsInstance(uuid, instanceMetadata, 0, 0, +null, 2); + assertTrue(clientInstance.maybeUpdateGetRequestEpoch(System.currentTimeMillis())); +// sleep for 3 ms to ensure that the next request is accepted. +Thread.sleep(3); +// Second request should be accepted as time since last request is greater than the retry interval. + assertTrue(clientInstance.maybeUpdateGetRequestEpoch(System.currentTimeMillis())); +} + +@Test +public void testMaybeUpdateGetRequestWithImmediateRetryFail() { + assertTrue(clientInstance.maybeUpdateGetRequestEpoch(System.currentTimeMillis())); +// Second request should be rejected as time since last request is less than the retry interval. + assertFalse(clientInstance.maybeUpdateGetRequestEpoch(System.currentTimeMillis())); +} + +@Test +public void testMaybeUpdatePushRequestAfterElapsedTimeValid() throws InterruptedException { Review Comment: The methods independently check for updating getTelemetryTime and pushTelemetryTime respectively. Is the concern more about that we can merge them in single test? As I have removed the sleep now from methods then do you think it's fine to keep them separate? ## core/src/test/java/kafka/metrics/ClientMetricsReceiverPluginTest.java: ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics; + +import kafka.metrics.ClientMetricsTestUtils.TestClientMetricsReceiver; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.junit.jupiter.api.Test; + +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1396161493 ## core/src/test/java/kafka/metrics/ClientMetricsInstanceTest.java: ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics; + +import org.apache.kafka.common.Uuid; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.net.UnknownHostException; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClientMetricsInstanceTest { + +private Uuid uuid; +private ClientMetricsInstanceMetadata instanceMetadata; +private ClientMetricsInstance clientInstance; + +@BeforeEach +public void setUp() throws UnknownHostException { +uuid = Uuid.randomUuid(); +instanceMetadata = new ClientMetricsInstanceMetadata(uuid, +ClientMetricsTestUtils.requestContext()); +clientInstance = new ClientMetricsInstance(Uuid.randomUuid(), instanceMetadata, 0, 0, +null, ClientMetricsConfigs.DEFAULT_INTERVAL_MS); +} + +@Test +public void testMaybeUpdateRequestEpochValid() { +// First request should be accepted. + assertTrue(clientInstance.maybeUpdateGetRequestEpoch(System.currentTimeMillis())); + assertTrue(clientInstance.maybeUpdatePushRequestEpoch(System.currentTimeMillis())); +} + +@Test +public void testMaybeUpdateGetRequestAfterElapsedTimeValid() throws InterruptedException { +ClientMetricsInstance clientInstance = new ClientMetricsInstance(uuid, instanceMetadata, 0, 0, +null, 2); + assertTrue(clientInstance.maybeUpdateGetRequestEpoch(System.currentTimeMillis())); +// sleep for 3 ms to ensure that the next request is accepted. +Thread.sleep(3); Review Comment: I have changed the code so we don't require sleep in tests, done. Also started using MockTime in ClientMetricsManager. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1396160383 ## core/src/test/java/kafka/metrics/ClientMetricsInstanceMetadataTest.java: ## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics; + +import org.apache.kafka.common.Uuid; +import org.junit.jupiter.api.Test; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Pattern; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClientMetricsInstanceMetadataTest { + +@Test +public void testIsMatchValid() throws UnknownHostException { +Uuid uuid = Uuid.randomUuid(); +ClientMetricsInstanceMetadata instanceMetadata = new ClientMetricsInstanceMetadata(uuid, +ClientMetricsTestUtils.requestContext()); +// We consider empty/missing client matching patterns as valid +assertTrue(instanceMetadata.isMatch(Collections.emptyMap())); +assertTrue(instanceMetadata.isMatch(null)); Review Comment: Make sense, user cannot. I have removed this test condition and null check in the code as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1396159445 ## core/src/main/java/kafka/metrics/ClientMetricsConfigs.java: ## @@ -80,6 +82,11 @@ public class ClientMetricsConfigs { public static final String CLIENT_SOURCE_ADDRESS = "client_source_address"; public static final String CLIENT_SOURCE_PORT = "client_source_port"; +// Empty string in client-metrics resource configs indicates that all the metrics are subscribed. +public static final String ALL_SUBSCRIBED_METRICS_CONFIG = "\"\""; Review Comment: I have moved to '*' as the identifier, done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1396158662 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable { public static ClientMetricsManager instance() { return INSTANCE; } +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; +private final Cache clientInstanceCache; +private final Map subscriptionMap; + +// The last subscription updated time is used to determine if the next telemetry request needs +// to re-evaluate the subscription id as per changes subscriptions. +private long lastSubscriptionUpdateEpoch; + +// Visible for testing +ClientMetricsManager() { +subscriptionMap = new ConcurrentHashMap<>(); +clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); +} public void updateSubscription(String subscriptionName, Properties properties) { -// TODO: Implement the update logic to manage subscriptions. +// IncrementalAlterConfigs API will send empty configs when all the configs are deleted +// for respective subscription. In that case, we need to remove the subscription from the map. +if (properties.isEmpty()) { +// Remove the subscription from the map if it exists, else ignore the config update. +if (subscriptionMap.containsKey(subscriptionName)) { +log.info("Removing subscription [{}] from the subscription map", subscriptionName); +subscriptionMap.remove(subscriptionName); +updateLastSubscriptionUpdateEpoch(); +} +return; +} + +ClientMetricsConfigs configs = new ClientMetricsConfigs(properties); +updateClientSubscription(subscriptionName, configs); +/* + Update last subscription updated time to current time to indicate that there is a change + in the subscription. This will be used to determine if the next telemetry request needs + to re-evaluate the subscription id as per changes subscriptions. +*/ +updateLastSubscriptionUpdateEpoch(); +} + +public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( +GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +long now = System.currentTimeMillis(); +Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) +.filter(id -> !id.equals(Uuid.ZERO_UUID)) +.orElse(generateNewClientId()); + +/* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issued get telemetry + request prior to push interval, then the client should get a throttle error but if the + subscription has changed since the last request then the client should get the updated + subscription immediately. +*/ +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the get request parameters for the client instance. +validateGetRequest(request, clientInstance); +} catch (ApiException exception) { +return request.getErrorResponse(throttleMs, exception); +} finally { +clientInstance.lastGetRequestEpoch(now); +} + +clientInstance.lastKnownError(Errors.NONE); +return createGetSubscriptionResponse(clientInstanceId, clientInstance, telemetryMaxBytes, throttleMs); +} + +public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, +int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +Uuid clientInstanceId = request.data().clientInstanceId(); +if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) { +String msg = String.format("Invalid request from the client [%s], invalid client instance id", +clientInstanceId); +return request.getErrorResponse(throttleMs, new InvalidRequestException(msg)); Review Comment: I have removed the throttle time THROTTLING_QUOTA_EXCEEDED as per the explanation in other comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1396158035 ## clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java: ## @@ -71,6 +73,31 @@ public PushTelemetryRequestData data() { return data; } +public PushTelemetryResponse createResponse(int throttleTimeMs, Errors errors) { +PushTelemetryResponseData responseData = new PushTelemetryResponseData(); +responseData.setErrorCode(errors.code()); +responseData.setThrottleTimeMs(throttleTimeMs); Review Comment: Thanks for the explanation @junrao, this is helpful. I have removed throttleTimeMs for the case when THROTTLING_QUOTA_EXCEEDED exception is thrown in telemetry APIs. Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1396154104 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable { public static ClientMetricsManager instance() { return INSTANCE; } +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; +private final Cache clientInstanceCache; +private final Map subscriptionMap; + +// The last subscription updated time is used to determine if the next telemetry request needs +// to re-evaluate the subscription id as per changes subscriptions. +private long lastSubscriptionUpdateEpoch; + +// Visible for testing +ClientMetricsManager() { +subscriptionMap = new ConcurrentHashMap<>(); +clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); +} public void updateSubscription(String subscriptionName, Properties properties) { -// TODO: Implement the update logic to manage subscriptions. +// IncrementalAlterConfigs API will send empty configs when all the configs are deleted +// for respective subscription. In that case, we need to remove the subscription from the map. +if (properties.isEmpty()) { +// Remove the subscription from the map if it exists, else ignore the config update. +if (subscriptionMap.containsKey(subscriptionName)) { +log.info("Removing subscription [{}] from the subscription map", subscriptionName); +subscriptionMap.remove(subscriptionName); +updateLastSubscriptionUpdateEpoch(); +} +return; +} + +ClientMetricsConfigs configs = new ClientMetricsConfigs(properties); +updateClientSubscription(subscriptionName, configs); +/* + Update last subscription updated time to current time to indicate that there is a change + in the subscription. This will be used to determine if the next telemetry request needs + to re-evaluate the subscription id as per changes subscriptions. +*/ +updateLastSubscriptionUpdateEpoch(); +} + +public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( +GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +long now = System.currentTimeMillis(); +Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) +.filter(id -> !id.equals(Uuid.ZERO_UUID)) +.orElse(generateNewClientId()); + +/* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issued get telemetry + request prior to push interval, then the client should get a throttle error but if the + subscription has changed since the last request then the client should get the updated + subscription immediately. +*/ +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the get request parameters for the client instance. +validateGetRequest(request, clientInstance); +} catch (ApiException exception) { +return request.getErrorResponse(throttleMs, exception); +} finally { +clientInstance.lastGetRequestEpoch(now); +} + +clientInstance.lastKnownError(Errors.NONE); +return createGetSubscriptionResponse(clientInstanceId, clientInstance, telemetryMaxBytes, throttleMs); +} + +public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, +int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +Uuid clientInstanceId = request.data().clientInstanceId(); +if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) { +String msg = String.format("Invalid request from the client [%s], invalid client instance id", +clientInstanceId); +return request.getErrorResponse(throttleMs, new InvalidRequestException(msg)); +} + +long now = System.currentTimeMillis(); +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the push request parameters for the client instance. +validatePushRequest(request, telemetryMaxBytes, clientInstance); +} catch (ApiException exception) { +clientInstance.lastKnownError(Errors.forException(exception));
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1396148679 ## core/src/main/java/kafka/metrics/ClientMetricsReceiverPlugin.java: ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics; + +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.server.telemetry.ClientTelemetryReceiver; + +import java.util.ArrayList; +import java.util.List; + +/** + * Plugin to register client telemetry receivers and export metrics. This class is used by the Kafka + * server to export client metrics to the registered receivers. + */ +public class ClientMetricsReceiverPlugin { + +private static final ClientMetricsReceiverPlugin INSTANCE = new ClientMetricsReceiverPlugin(); Review Comment: Make sense, for `ClientMetricsManager` the class is needed to be in BrokerServer.scala and ControllerServer.scala as the client-metrics resource reload has to be managed at a singleton instance. Shall I add the changes in SharedServer.scala to share the same class? For ClientMetricsReceiverPlugin, here is the PR that uses same https://github.com/apache/kafka/pull/14767/files#diff-019dd06900a2f4714d626ffb071466765413970462f36a86e8a37c0ef9c0afb9, as the class is required while defining DynamicBrokerConfig hence I thought to make it singleton as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1396143360 ## clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java: ## @@ -60,17 +62,39 @@ public PushTelemetryRequest(PushTelemetryRequestData data, short version) { @Override public PushTelemetryResponse getErrorResponse(int throttleTimeMs, Throwable e) { -PushTelemetryResponseData responseData = new PushTelemetryResponseData() -.setErrorCode(Errors.forException(e).code()) -.setThrottleTimeMs(throttleTimeMs); -return new PushTelemetryResponse(responseData); +return errorResponse(throttleTimeMs, Errors.forException(e)); Review Comment: I might have missed something here to understand, the method to override has to be `getErrorResponse` and I just created another method `errorResponse` which helps building the response. Is there anything I need to address here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
AndrewJSchofield commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1395583845 ## core/src/test/java/kafka/metrics/ClientMetricsInstanceMetadataTest.java: ## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics; + +import org.apache.kafka.common.Uuid; +import org.junit.jupiter.api.Test; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Pattern; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClientMetricsInstanceMetadataTest { + +@Test +public void testIsMatchValid() throws UnknownHostException { +Uuid uuid = Uuid.randomUuid(); +ClientMetricsInstanceMetadata instanceMetadata = new ClientMetricsInstanceMetadata(uuid, +ClientMetricsTestUtils.requestContext()); +// We consider empty/missing client matching patterns as valid +assertTrue(instanceMetadata.isMatch(Collections.emptyMap())); +assertTrue(instanceMetadata.isMatch(null)); Review Comment: Good question. `kafka-client-metrics.sh --alter --name CM1 --match ""` deletes the map. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1395220263 ## core/src/main/java/kafka/metrics/ClientMetricsInstance.java: ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.protocol.Errors; + +import java.util.Objects; +import java.util.Set; + +/** + * Contains the metrics instance metadata and the state of the client instance. + */ +public class ClientMetricsInstance { + +private final Uuid clientInstanceId; +private final ClientMetricsInstanceMetadata instanceMetadata; +private final int subscriptionId; +private final long subscriptionUpdateEpoch; +private final Set metrics; +private final int pushIntervalMs; + +private long lastGetRequestEpoch; +private long lastPushRequestEpoch; +private volatile boolean terminating; +private volatile Errors lastKnownError; + +public ClientMetricsInstance(Uuid clientInstanceId, ClientMetricsInstanceMetadata instanceMetadata, +int subscriptionId, long subscriptionUpdateEpoch, Set metrics, int pushIntervalMs) { +this.clientInstanceId = Objects.requireNonNull(clientInstanceId); +this.instanceMetadata = Objects.requireNonNull(instanceMetadata); +this.subscriptionId = subscriptionId; +this.subscriptionUpdateEpoch = subscriptionUpdateEpoch; +this.metrics = metrics; +this.terminating = false; +this.pushIntervalMs = pushIntervalMs; +this.lastKnownError = Errors.NONE; +} + +public Uuid clientInstanceId() { +return clientInstanceId; +} + +public ClientMetricsInstanceMetadata instanceMetadata() { +return instanceMetadata; +} + +public int pushIntervalMs() { +return pushIntervalMs; +} + +public long subscriptionUpdateEpoch() { +return subscriptionUpdateEpoch; +} + +public int subscriptionId() { +return subscriptionId; +} + +public Set metrics() { +return metrics; +} + +public boolean terminating() { +return terminating; Review Comment: The reason why I didn't remove the instance from in-memory cache as the subsequent requests from terminated client should be rejected. Having said that, the terminated client should also not remain in-memory forever hence the terminated client should be removed from cache as per the eviction policy of cache i.e. `MAX(60*1000, PushIntervalMs * 3) milliseconds`. This shall be inherently handled by the cache improvement task where we have some time based eviction policy. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1395204343 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -30,17 +68,366 @@ public class ClientMetricsManager implements Closeable { private static final Logger log = LoggerFactory.getLogger(ClientMetricsManager.class); private static final ClientMetricsManager INSTANCE = new ClientMetricsManager(); +private static final List SUPPORTED_COMPRESSION_TYPES = Collections.unmodifiableList( +Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, CompressionType.GZIP.id, CompressionType.SNAPPY.id)); public static ClientMetricsManager instance() { return INSTANCE; } +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; +private final Cache clientInstanceCache; +private final Map subscriptionMap; + +// The last subscription updated time is used to determine if the next telemetry request needs +// to re-evaluate the subscription id as per changes subscriptions. +private volatile long lastSubscriptionUpdateEpoch; + +// Visible for testing +ClientMetricsManager() { +subscriptionMap = new ConcurrentHashMap<>(); +clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); +} public void updateSubscription(String subscriptionName, Properties properties) { -// TODO: Implement the update logic to manage subscriptions. +// IncrementalAlterConfigs API will send empty configs when all the configs are deleted +// for respective subscription. In that case, we need to remove the subscription from the map. +if (properties.isEmpty()) { +// Remove the subscription from the map if it exists, else ignore the config update. +if (subscriptionMap.containsKey(subscriptionName)) { +log.info("Removing subscription [{}] from the subscription map", subscriptionName); +subscriptionMap.remove(subscriptionName); +updateLastSubscriptionUpdateEpoch(); +} +return; +} + +ClientMetricsConfigs configs = new ClientMetricsConfigs(properties); +updateClientSubscription(subscriptionName, configs); +/* + Update last subscription updated time to current time to indicate that there is a change + in the subscription. This will be used to determine if the next telemetry request needs + to re-evaluate the subscription id as per changes subscriptions. +*/ +updateLastSubscriptionUpdateEpoch(); +} + +public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( +GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +long now = System.currentTimeMillis(); +Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) +.filter(id -> !id.equals(Uuid.ZERO_UUID)) +.orElse(generateNewClientId()); + +/* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issued get telemetry + request prior to push interval, then the client should get a throttle error but if the + subscription has changed since the last request then the client should get the updated + subscription immediately. +*/ +ClientMetricsInstance clientInstance = clientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the get request parameters for the client instance. +validateGetRequest(request, clientInstance, now); +} catch (ApiException exception) { +return request.getErrorResponse(throttleMs, exception); +} + +clientInstance.lastKnownError(Errors.NONE); +return createGetSubscriptionResponse(clientInstanceId, clientInstance, telemetryMaxBytes, throttleMs); +} + +public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, +int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +Uuid clientInstanceId = request.data().clientInstanceId(); +if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) { +String msg = String.format("Invalid request from the client [%s], invalid client instance id", +clientInstanceId); +return request.getErrorResponse(throttleMs, new InvalidRequestException(msg)); +} + +long now = System.currentTimeMillis(); +ClientMetricsInstance clientInstance =
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1395171995 ## core/src/test/java/kafka/server/ClientMetricsManagerTest.java: ## @@ -0,0 +1,921 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server; + +import kafka.metrics.ClientMetricsConfigs; +import kafka.metrics.ClientMetricsInstance; +import kafka.metrics.ClientMetricsTestUtils; +import kafka.server.ClientMetricsManager.SubscriptionInfo; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryRequest.Builder; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClientMetricsManagerTest { + +private ClientMetricsManager clientMetricsManager; + +@BeforeEach +public void setUp() { +clientMetricsManager = new ClientMetricsManager(); +} + +@Test +public void testUpdateSubscription() { +assertTrue(clientMetricsManager.subscriptions().isEmpty()); + +long previousEpoch = clientMetricsManager.lastSubscriptionUpdateEpoch(); +clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties()); + +assertEquals(1, clientMetricsManager.subscriptions().size()); +assertNotNull(clientMetricsManager.subscriptionInfo("sub-1")); + +SubscriptionInfo subscriptionInfo = clientMetricsManager.subscriptionInfo("sub-1"); +Set metrics = subscriptionInfo.metrics(); + +// Validate metrics. +assertEquals(ClientMetricsTestUtils.DEFAULT_METRICS.split(",").length, metrics.size()); + Arrays.stream(ClientMetricsTestUtils.DEFAULT_METRICS.split(",")).forEach(metric -> +assertTrue(metrics.contains(metric))); +// Validate push interval. + assertEquals(ClientMetricsTestUtils.defaultProperties().getProperty(ClientMetricsConfigs.PUSH_INTERVAL_MS), +String.valueOf(subscriptionInfo.intervalMs())); + +// Validate match patterns. + assertEquals(ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.size(), +subscriptionInfo.matchPattern().size()); +ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.forEach(pattern -> { +String[] split = pattern.split("="); +assertTrue(subscriptionInfo.matchPattern().containsKey(split[0])); +assertEquals(split[1], subscriptionInfo.matchPattern().get(split[0]).pattern()); +}); +assertTrue(clientMetricsManager.lastSubscriptionUpdateEpoch() > previousEpoch); +} + +@Test +public void testUpdateSubscriptionWithEmptyProperties() { +assertTrue(clientMetricsManager.subscriptions().isEmpty()); +long previousEpoch = clientMetricsManager.lastSubscriptionUpdateEpoch(); +clientMetricsManager.updateSubscription("sub-1", new Properties()); +// No subscription should be added as the properties are empty. +assertEquals(0, clientMetricsManager.subscriptions().size()); +
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1395166691 ## core/src/test/java/kafka/server/ClientMetricsManagerTest.java: ## @@ -0,0 +1,921 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server; + +import kafka.metrics.ClientMetricsConfigs; +import kafka.metrics.ClientMetricsInstance; +import kafka.metrics.ClientMetricsTestUtils; +import kafka.server.ClientMetricsManager.SubscriptionInfo; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryRequest.Builder; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClientMetricsManagerTest { + +private ClientMetricsManager clientMetricsManager; + +@BeforeEach +public void setUp() { +clientMetricsManager = new ClientMetricsManager(); +} + +@Test +public void testUpdateSubscription() { +assertTrue(clientMetricsManager.subscriptions().isEmpty()); + +long previousEpoch = clientMetricsManager.lastSubscriptionUpdateEpoch(); +clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultProperties()); + +assertEquals(1, clientMetricsManager.subscriptions().size()); +assertNotNull(clientMetricsManager.subscriptionInfo("sub-1")); + +SubscriptionInfo subscriptionInfo = clientMetricsManager.subscriptionInfo("sub-1"); +Set metrics = subscriptionInfo.metrics(); + +// Validate metrics. +assertEquals(ClientMetricsTestUtils.DEFAULT_METRICS.split(",").length, metrics.size()); + Arrays.stream(ClientMetricsTestUtils.DEFAULT_METRICS.split(",")).forEach(metric -> +assertTrue(metrics.contains(metric))); +// Validate push interval. + assertEquals(ClientMetricsTestUtils.defaultProperties().getProperty(ClientMetricsConfigs.PUSH_INTERVAL_MS), +String.valueOf(subscriptionInfo.intervalMs())); + +// Validate match patterns. + assertEquals(ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.size(), +subscriptionInfo.matchPattern().size()); +ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.forEach(pattern -> { +String[] split = pattern.split("="); +assertTrue(subscriptionInfo.matchPattern().containsKey(split[0])); +assertEquals(split[1], subscriptionInfo.matchPattern().get(split[0]).pattern()); +}); +assertTrue(clientMetricsManager.lastSubscriptionUpdateEpoch() > previousEpoch); +} + +@Test +public void testUpdateSubscriptionWithEmptyProperties() { +assertTrue(clientMetricsManager.subscriptions().isEmpty()); +long previousEpoch = clientMetricsManager.lastSubscriptionUpdateEpoch(); +clientMetricsManager.updateSubscription("sub-1", new Properties()); +// No subscription should be added as the properties are empty. +assertEquals(0, clientMetricsManager.subscriptions().size()); +
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1395156877 ## core/src/test/java/kafka/metrics/ClientMetricsInstanceTest.java: ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics; + +import org.apache.kafka.common.Uuid; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.net.UnknownHostException; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClientMetricsInstanceTest { + +private Uuid uuid; +private ClientMetricsInstanceMetadata instanceMetadata; +private ClientMetricsInstance clientInstance; + +@BeforeEach +public void setUp() throws UnknownHostException { +uuid = Uuid.randomUuid(); +instanceMetadata = new ClientMetricsInstanceMetadata(uuid, +ClientMetricsTestUtils.requestContext()); Review Comment: I have used `uuid` var later as well to check if `match` is successful hence kept it separate. But if it's for merging the line 37 and 38 then I did that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
AndrewJSchofield commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1394744738 ## core/src/main/java/kafka/metrics/ClientMetricsConfigs.java: ## @@ -80,6 +82,11 @@ public class ClientMetricsConfigs { public static final String CLIENT_SOURCE_ADDRESS = "client_source_address"; public static final String CLIENT_SOURCE_PORT = "client_source_port"; +// Empty string in client-metrics resource configs indicates that all the metrics are subscribed. +public static final String ALL_SUBSCRIBED_METRICS_CONFIG = "\"\""; Review Comment: It would be possible to change the KIP to support a list containing just `*` to mean all metrics. The KIP today says a list containing just `""` means all metrics. That's a pretty simple change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
junrao commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1391737904 ## clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java: ## @@ -60,17 +62,39 @@ public PushTelemetryRequest(PushTelemetryRequestData data, short version) { @Override public PushTelemetryResponse getErrorResponse(int throttleTimeMs, Throwable e) { -PushTelemetryResponseData responseData = new PushTelemetryResponseData() -.setErrorCode(Errors.forException(e).code()) -.setThrottleTimeMs(throttleTimeMs); -return new PushTelemetryResponse(responseData); +return errorResponse(throttleTimeMs, Errors.forException(e)); Review Comment: This is an existing issue, but `getErrorResponse` can just be `errorResponse`. ## core/src/test/java/kafka/metrics/ClientMetricsInstanceTest.java: ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics; + +import org.apache.kafka.common.Uuid; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.net.UnknownHostException; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClientMetricsInstanceTest { + +private Uuid uuid; +private ClientMetricsInstanceMetadata instanceMetadata; +private ClientMetricsInstance clientInstance; + +@BeforeEach +public void setUp() throws UnknownHostException { +uuid = Uuid.randomUuid(); +instanceMetadata = new ClientMetricsInstanceMetadata(uuid, +ClientMetricsTestUtils.requestContext()); Review Comment: merge with previous line? ## core/src/test/java/kafka/server/ClientMetricsManagerTest.java: ## @@ -0,0 +1,921 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server; + +import kafka.metrics.ClientMetricsConfigs; +import kafka.metrics.ClientMetricsInstance; +import kafka.metrics.ClientMetricsTestUtils; +import kafka.server.ClientMetricsManager.SubscriptionInfo; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryRequest.Builder; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on PR #14699: URL: https://github.com/apache/kafka/pull/14699#issuecomment-1809013650 > @apoorvmittal10 : Thanks for the PR. Made a pass of non-testing files. Left a few comments. Thanks a lot for the review @junrao . I have addressed the comments and have a question related to throttleTimeMs for errors in the comments. Please if you can re-review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1391660088 ## core/src/main/java/kafka/metrics/ClientMetricsReceiverPlugin.java: ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics; + +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.server.telemetry.ClientTelemetryReceiver; + +import java.util.ArrayList; +import java.util.List; + +/** + * Plugin to register client telemetry receivers and export metrics. This class is used by the Kafka + * server to export client metrics to the registered receivers. + */ +public class ClientMetricsReceiverPlugin { + +private static final List RECEIVERS = new ArrayList<>(); Review Comment: Done. ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable { public static ClientMetricsManager instance() { return INSTANCE; } +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; +private final Cache clientInstanceCache; +private final Map subscriptionMap; + +// The last subscription updated time is used to determine if the next telemetry request needs +// to re-evaluate the subscription id as per changes subscriptions. +private long lastSubscriptionUpdateEpoch; + +// Visible for testing +ClientMetricsManager() { +subscriptionMap = new ConcurrentHashMap<>(); +clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); +} public void updateSubscription(String subscriptionName, Properties properties) { -// TODO: Implement the update logic to manage subscriptions. +// IncrementalAlterConfigs API will send empty configs when all the configs are deleted +// for respective subscription. In that case, we need to remove the subscription from the map. +if (properties.isEmpty()) { +// Remove the subscription from the map if it exists, else ignore the config update. +if (subscriptionMap.containsKey(subscriptionName)) { +log.info("Removing subscription [{}] from the subscription map", subscriptionName); +subscriptionMap.remove(subscriptionName); +updateLastSubscriptionUpdateEpoch(); +} +return; +} + +ClientMetricsConfigs configs = new ClientMetricsConfigs(properties); +updateClientSubscription(subscriptionName, configs); +/* + Update last subscription updated time to current time to indicate that there is a change + in the subscription. This will be used to determine if the next telemetry request needs + to re-evaluate the subscription id as per changes subscriptions. +*/ +updateLastSubscriptionUpdateEpoch(); +} + +public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( +GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +long now = System.currentTimeMillis(); +Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) +.filter(id -> !id.equals(Uuid.ZERO_UUID)) +.orElse(generateNewClientId()); + +/* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issued get telemetry + request prior to push interval, then the client should get a throttle error but if the + subscription has changed since the last request then the client should get the updated + subscription immediately. +*/ +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1391659830 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable { public static ClientMetricsManager instance() { return INSTANCE; } +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; +private final Cache clientInstanceCache; +private final Map subscriptionMap; + +// The last subscription updated time is used to determine if the next telemetry request needs +// to re-evaluate the subscription id as per changes subscriptions. +private long lastSubscriptionUpdateEpoch; + +// Visible for testing +ClientMetricsManager() { +subscriptionMap = new ConcurrentHashMap<>(); +clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); +} public void updateSubscription(String subscriptionName, Properties properties) { -// TODO: Implement the update logic to manage subscriptions. +// IncrementalAlterConfigs API will send empty configs when all the configs are deleted +// for respective subscription. In that case, we need to remove the subscription from the map. +if (properties.isEmpty()) { +// Remove the subscription from the map if it exists, else ignore the config update. +if (subscriptionMap.containsKey(subscriptionName)) { +log.info("Removing subscription [{}] from the subscription map", subscriptionName); +subscriptionMap.remove(subscriptionName); +updateLastSubscriptionUpdateEpoch(); +} +return; +} + +ClientMetricsConfigs configs = new ClientMetricsConfigs(properties); +updateClientSubscription(subscriptionName, configs); +/* + Update last subscription updated time to current time to indicate that there is a change + in the subscription. This will be used to determine if the next telemetry request needs + to re-evaluate the subscription id as per changes subscriptions. +*/ +updateLastSubscriptionUpdateEpoch(); +} + +public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( +GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +long now = System.currentTimeMillis(); +Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) +.filter(id -> !id.equals(Uuid.ZERO_UUID)) +.orElse(generateNewClientId()); + +/* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issued get telemetry + request prior to push interval, then the client should get a throttle error but if the + subscription has changed since the last request then the client should get the updated + subscription immediately. +*/ +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the get request parameters for the client instance. +validateGetRequest(request, clientInstance); +} catch (ApiException exception) { +return request.getErrorResponse(throttleMs, exception); +} finally { +clientInstance.lastGetRequestEpoch(now); +} + +clientInstance.lastKnownError(Errors.NONE); +return createGetSubscriptionResponse(clientInstanceId, clientInstance, telemetryMaxBytes, throttleMs); +} + +public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, +int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +Uuid clientInstanceId = request.data().clientInstanceId(); +if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) { +String msg = String.format("Invalid request from the client [%s], invalid client instance id", +clientInstanceId); +return request.getErrorResponse(throttleMs, new InvalidRequestException(msg)); +} + +long now = System.currentTimeMillis(); +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the push request parameters for the client instance. +validatePushRequest(request, telemetryMaxBytes, clientInstance); +} catch (ApiException exception) { +clientInstance.lastKnownError(Errors.forException(exception));
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1391659127 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable { public static ClientMetricsManager instance() { return INSTANCE; } +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; +private final Cache clientInstanceCache; +private final Map subscriptionMap; + +// The last subscription updated time is used to determine if the next telemetry request needs +// to re-evaluate the subscription id as per changes subscriptions. +private long lastSubscriptionUpdateEpoch; + +// Visible for testing +ClientMetricsManager() { +subscriptionMap = new ConcurrentHashMap<>(); +clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); +} public void updateSubscription(String subscriptionName, Properties properties) { -// TODO: Implement the update logic to manage subscriptions. +// IncrementalAlterConfigs API will send empty configs when all the configs are deleted +// for respective subscription. In that case, we need to remove the subscription from the map. +if (properties.isEmpty()) { +// Remove the subscription from the map if it exists, else ignore the config update. +if (subscriptionMap.containsKey(subscriptionName)) { +log.info("Removing subscription [{}] from the subscription map", subscriptionName); +subscriptionMap.remove(subscriptionName); +updateLastSubscriptionUpdateEpoch(); +} +return; +} + +ClientMetricsConfigs configs = new ClientMetricsConfigs(properties); +updateClientSubscription(subscriptionName, configs); +/* + Update last subscription updated time to current time to indicate that there is a change + in the subscription. This will be used to determine if the next telemetry request needs + to re-evaluate the subscription id as per changes subscriptions. +*/ +updateLastSubscriptionUpdateEpoch(); +} + +public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( +GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +long now = System.currentTimeMillis(); +Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) +.filter(id -> !id.equals(Uuid.ZERO_UUID)) +.orElse(generateNewClientId()); + +/* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issued get telemetry + request prior to push interval, then the client should get a throttle error but if the + subscription has changed since the last request then the client should get the updated + subscription immediately. +*/ +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the get request parameters for the client instance. +validateGetRequest(request, clientInstance); +} catch (ApiException exception) { +return request.getErrorResponse(throttleMs, exception); +} finally { +clientInstance.lastGetRequestEpoch(now); +} + +clientInstance.lastKnownError(Errors.NONE); +return createGetSubscriptionResponse(clientInstanceId, clientInstance, telemetryMaxBytes, throttleMs); +} + +public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, +int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +Uuid clientInstanceId = request.data().clientInstanceId(); +if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) { +String msg = String.format("Invalid request from the client [%s], invalid client instance id", +clientInstanceId); +return request.getErrorResponse(throttleMs, new InvalidRequestException(msg)); +} + +long now = System.currentTimeMillis(); +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the push request parameters for the client instance. +validatePushRequest(request, telemetryMaxBytes, clientInstance); +} catch (ApiException exception) { +clientInstance.lastKnownError(Errors.forException(exception));
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1391658586 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable { public static ClientMetricsManager instance() { return INSTANCE; } +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; +private final Cache clientInstanceCache; +private final Map subscriptionMap; + +// The last subscription updated time is used to determine if the next telemetry request needs +// to re-evaluate the subscription id as per changes subscriptions. +private long lastSubscriptionUpdateEpoch; + +// Visible for testing +ClientMetricsManager() { +subscriptionMap = new ConcurrentHashMap<>(); +clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); +} public void updateSubscription(String subscriptionName, Properties properties) { -// TODO: Implement the update logic to manage subscriptions. +// IncrementalAlterConfigs API will send empty configs when all the configs are deleted +// for respective subscription. In that case, we need to remove the subscription from the map. +if (properties.isEmpty()) { +// Remove the subscription from the map if it exists, else ignore the config update. +if (subscriptionMap.containsKey(subscriptionName)) { +log.info("Removing subscription [{}] from the subscription map", subscriptionName); +subscriptionMap.remove(subscriptionName); +updateLastSubscriptionUpdateEpoch(); +} +return; +} + +ClientMetricsConfigs configs = new ClientMetricsConfigs(properties); +updateClientSubscription(subscriptionName, configs); +/* + Update last subscription updated time to current time to indicate that there is a change + in the subscription. This will be used to determine if the next telemetry request needs + to re-evaluate the subscription id as per changes subscriptions. +*/ +updateLastSubscriptionUpdateEpoch(); +} + +public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( +GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +long now = System.currentTimeMillis(); +Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) +.filter(id -> !id.equals(Uuid.ZERO_UUID)) +.orElse(generateNewClientId()); + +/* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issued get telemetry + request prior to push interval, then the client should get a throttle error but if the + subscription has changed since the last request then the client should get the updated + subscription immediately. +*/ +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the get request parameters for the client instance. +validateGetRequest(request, clientInstance); +} catch (ApiException exception) { +return request.getErrorResponse(throttleMs, exception); +} finally { +clientInstance.lastGetRequestEpoch(now); +} + +clientInstance.lastKnownError(Errors.NONE); +return createGetSubscriptionResponse(clientInstanceId, clientInstance, telemetryMaxBytes, throttleMs); +} + +public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, +int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +Uuid clientInstanceId = request.data().clientInstanceId(); +if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) { +String msg = String.format("Invalid request from the client [%s], invalid client instance id", +clientInstanceId); +return request.getErrorResponse(throttleMs, new InvalidRequestException(msg)); +} + +long now = System.currentTimeMillis(); +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the push request parameters for the client instance. +validatePushRequest(request, telemetryMaxBytes, clientInstance); +} catch (ApiException exception) { +clientInstance.lastKnownError(Errors.forException(exception));
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1391658438 ## core/src/main/java/kafka/metrics/ClientMetricsInstance.java: ## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.protocol.Errors; + +import java.util.Objects; +import java.util.Set; + +/** + * Contains the metrics instance metadata and the state of the client instance. + */ +public class ClientMetricsInstance { + +private final Uuid clientInstanceId; +private final ClientMetricsInstanceMetadata instanceMetadata; +private final int subscriptionId; +private final long subscriptionUpdateEpoch; +private final Set metrics; +private final int pushIntervalMs; + +private boolean terminating; +private long lastGetRequestEpoch; +private long lastPushRequestEpoch; +private Errors lastKnownError; Review Comment: I have marked 2 of them volatile and other 2 are protected now with `synchronized`. I have added additional concurrency tests in to validate the behaviour that a single request can process when multiple are received for same client instance, others get rejected by throttling error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1391656533 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable { public static ClientMetricsManager instance() { return INSTANCE; } +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; +private final Cache clientInstanceCache; +private final Map subscriptionMap; + +// The last subscription updated time is used to determine if the next telemetry request needs +// to re-evaluate the subscription id as per changes subscriptions. +private long lastSubscriptionUpdateEpoch; Review Comment: Make sense, thanks. Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1391656238 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable { public static ClientMetricsManager instance() { return INSTANCE; } +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; +private final Cache clientInstanceCache; +private final Map subscriptionMap; + +// The last subscription updated time is used to determine if the next telemetry request needs +// to re-evaluate the subscription id as per changes subscriptions. +private long lastSubscriptionUpdateEpoch; + +// Visible for testing +ClientMetricsManager() { +subscriptionMap = new ConcurrentHashMap<>(); +clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); +} public void updateSubscription(String subscriptionName, Properties properties) { -// TODO: Implement the update logic to manage subscriptions. +// IncrementalAlterConfigs API will send empty configs when all the configs are deleted +// for respective subscription. In that case, we need to remove the subscription from the map. +if (properties.isEmpty()) { +// Remove the subscription from the map if it exists, else ignore the config update. +if (subscriptionMap.containsKey(subscriptionName)) { +log.info("Removing subscription [{}] from the subscription map", subscriptionName); +subscriptionMap.remove(subscriptionName); +updateLastSubscriptionUpdateEpoch(); +} +return; +} + +ClientMetricsConfigs configs = new ClientMetricsConfigs(properties); +updateClientSubscription(subscriptionName, configs); +/* + Update last subscription updated time to current time to indicate that there is a change + in the subscription. This will be used to determine if the next telemetry request needs + to re-evaluate the subscription id as per changes subscriptions. +*/ +updateLastSubscriptionUpdateEpoch(); +} + +public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( +GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +long now = System.currentTimeMillis(); +Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) +.filter(id -> !id.equals(Uuid.ZERO_UUID)) +.orElse(generateNewClientId()); + +/* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issued get telemetry + request prior to push interval, then the client should get a throttle error but if the + subscription has changed since the last request then the client should get the updated + subscription immediately. +*/ +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the get request parameters for the client instance. +validateGetRequest(request, clientInstance); +} catch (ApiException exception) { +return request.getErrorResponse(throttleMs, exception); +} finally { +clientInstance.lastGetRequestEpoch(now); +} + +clientInstance.lastKnownError(Errors.NONE); +return createGetSubscriptionResponse(clientInstanceId, clientInstance, telemetryMaxBytes, throttleMs); +} + +public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, +int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +Uuid clientInstanceId = request.data().clientInstanceId(); +if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) { +String msg = String.format("Invalid request from the client [%s], invalid client instance id", +clientInstanceId); +return request.getErrorResponse(throttleMs, new InvalidRequestException(msg)); +} + +long now = System.currentTimeMillis(); +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the push request parameters for the client instance. +validatePushRequest(request, telemetryMaxBytes, clientInstance); +} catch (ApiException exception) { +clientInstance.lastKnownError(Errors.forException(exception));
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1391654569 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -16,12 +16,48 @@ */ package kafka.server; +import java.util.Collections; Review Comment: The reason I kept this at `kafka.server` package as I see all managers (in scala) processing API calls from KafkaApis.scala resides `kafka.server` package. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1391652781 ## clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java: ## @@ -71,6 +73,31 @@ public PushTelemetryRequestData data() { return data; } +public PushTelemetryResponse createResponse(int throttleTimeMs, Errors errors) { +PushTelemetryResponseData responseData = new PushTelemetryResponseData(); +responseData.setErrorCode(errors.code()); +responseData.setThrottleTimeMs(throttleTimeMs); +return new PushTelemetryResponse(responseData); +} + +public String getMetricsContentType() { Review Comment: Missed these, thanks. Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1391651232 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable { public static ClientMetricsManager instance() { return INSTANCE; } +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; +private final Cache clientInstanceCache; +private final Map subscriptionMap; + +// The last subscription updated time is used to determine if the next telemetry request needs +// to re-evaluate the subscription id as per changes subscriptions. +private long lastSubscriptionUpdateEpoch; + +// Visible for testing +ClientMetricsManager() { +subscriptionMap = new ConcurrentHashMap<>(); +clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); +} public void updateSubscription(String subscriptionName, Properties properties) { -// TODO: Implement the update logic to manage subscriptions. +// IncrementalAlterConfigs API will send empty configs when all the configs are deleted +// for respective subscription. In that case, we need to remove the subscription from the map. +if (properties.isEmpty()) { +// Remove the subscription from the map if it exists, else ignore the config update. +if (subscriptionMap.containsKey(subscriptionName)) { +log.info("Removing subscription [{}] from the subscription map", subscriptionName); +subscriptionMap.remove(subscriptionName); +updateLastSubscriptionUpdateEpoch(); +} +return; +} + +ClientMetricsConfigs configs = new ClientMetricsConfigs(properties); +updateClientSubscription(subscriptionName, configs); +/* + Update last subscription updated time to current time to indicate that there is a change + in the subscription. This will be used to determine if the next telemetry request needs + to re-evaluate the subscription id as per changes subscriptions. +*/ +updateLastSubscriptionUpdateEpoch(); +} + +public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( +GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +long now = System.currentTimeMillis(); +Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) +.filter(id -> !id.equals(Uuid.ZERO_UUID)) +.orElse(generateNewClientId()); + +/* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issued get telemetry + request prior to push interval, then the client should get a throttle error but if the + subscription has changed since the last request then the client should get the updated + subscription immediately. +*/ +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the get request parameters for the client instance. +validateGetRequest(request, clientInstance); +} catch (ApiException exception) { +return request.getErrorResponse(throttleMs, exception); +} finally { +clientInstance.lastGetRequestEpoch(now); +} + +clientInstance.lastKnownError(Errors.NONE); +return createGetSubscriptionResponse(clientInstanceId, clientInstance, telemetryMaxBytes, throttleMs); +} + +public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, +int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +Uuid clientInstanceId = request.data().clientInstanceId(); +if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) { +String msg = String.format("Invalid request from the client [%s], invalid client instance id", +clientInstanceId); +return request.getErrorResponse(throttleMs, new InvalidRequestException(msg)); +} + +long now = System.currentTimeMillis(); +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the push request parameters for the client instance. +validatePushRequest(request, telemetryMaxBytes, clientInstance); +} catch (ApiException exception) { +clientInstance.lastKnownError(Errors.forException(exception));
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1391650483 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable { public static ClientMetricsManager instance() { return INSTANCE; } +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; +private final Cache clientInstanceCache; +private final Map subscriptionMap; + +// The last subscription updated time is used to determine if the next telemetry request needs +// to re-evaluate the subscription id as per changes subscriptions. +private long lastSubscriptionUpdateEpoch; + +// Visible for testing +ClientMetricsManager() { +subscriptionMap = new ConcurrentHashMap<>(); +clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); +} public void updateSubscription(String subscriptionName, Properties properties) { -// TODO: Implement the update logic to manage subscriptions. +// IncrementalAlterConfigs API will send empty configs when all the configs are deleted +// for respective subscription. In that case, we need to remove the subscription from the map. +if (properties.isEmpty()) { +// Remove the subscription from the map if it exists, else ignore the config update. +if (subscriptionMap.containsKey(subscriptionName)) { +log.info("Removing subscription [{}] from the subscription map", subscriptionName); +subscriptionMap.remove(subscriptionName); +updateLastSubscriptionUpdateEpoch(); +} +return; +} + +ClientMetricsConfigs configs = new ClientMetricsConfigs(properties); +updateClientSubscription(subscriptionName, configs); +/* + Update last subscription updated time to current time to indicate that there is a change + in the subscription. This will be used to determine if the next telemetry request needs + to re-evaluate the subscription id as per changes subscriptions. +*/ +updateLastSubscriptionUpdateEpoch(); +} + +public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( +GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +long now = System.currentTimeMillis(); +Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) +.filter(id -> !id.equals(Uuid.ZERO_UUID)) +.orElse(generateNewClientId()); + +/* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issued get telemetry + request prior to push interval, then the client should get a throttle error but if the + subscription has changed since the last request then the client should get the updated + subscription immediately. +*/ +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the get request parameters for the client instance. +validateGetRequest(request, clientInstance); +} catch (ApiException exception) { +return request.getErrorResponse(throttleMs, exception); +} finally { +clientInstance.lastGetRequestEpoch(now); +} + +clientInstance.lastKnownError(Errors.NONE); +return createGetSubscriptionResponse(clientInstanceId, clientInstance, telemetryMaxBytes, throttleMs); +} + +public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, +int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +Uuid clientInstanceId = request.data().clientInstanceId(); +if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) { +String msg = String.format("Invalid request from the client [%s], invalid client instance id", +clientInstanceId); +return request.getErrorResponse(throttleMs, new InvalidRequestException(msg)); Review Comment: I have asked the same question in other comment but shouldn't we set that as API call goes through `sendMaybeThrottle` in KafkaApis which throttles the request? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1391648051 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server; + +import java.util.Collections; +import java.util.regex.Pattern; +import kafka.metrics.ClientMetricsConfigs; +import kafka.metrics.ClientMetricsInstance; +import kafka.metrics.ClientMetricsInstanceMetadata; +import kafka.metrics.ClientMetricsReceiverPlugin; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; +import org.apache.kafka.common.cache.SynchronizedCache; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.TelemetryTooLargeException; +import org.apache.kafka.common.errors.ThrottlingQuotaExceededException; +import org.apache.kafka.common.errors.UnknownSubscriptionIdException; +import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.requests.RequestContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.zip.CRC32; + +/** + * Handles client telemetry metrics requests/responses, subscriptions and instance information. + */ +public class ClientMetricsManager implements Closeable { + +private static final Logger log = LoggerFactory.getLogger(ClientMetricsManager.class); +private static final ClientMetricsManager INSTANCE = new ClientMetricsManager(); + +public static ClientMetricsManager instance() { +return INSTANCE; +} +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; Review Comment: > Do we need to make this an LRU cache? If a client is terminated or idle, we should remove them from the cache I have started with LRU cache and planning to improve this with cache which timebounds the connection. The KIP says: `client instance specific state is maintained in broker memory up to MAX(60*1000, PushIntervalMs * 3) milliseconds`. I ll add improvement on the cache to respect that: https://issues.apache.org/jira/browse/KAFKA-15813 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1391640685 ## core/src/main/java/kafka/metrics/ClientMetricsInstanceMetadata.java: ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics; + +import java.util.regex.Pattern; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.requests.RequestContext; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Information from the client's metadata is gathered from the client's request. + */ +public class ClientMetricsInstanceMetadata { + +private final Map attributesMap; + +public ClientMetricsInstanceMetadata(Uuid clientInstanceId, RequestContext requestContext) { +Objects.requireNonNull(clientInstanceId); +Objects.requireNonNull(requestContext); + +attributesMap = new HashMap<>(); + +attributesMap.put(ClientMetricsConfigs.CLIENT_INSTANCE_ID, clientInstanceId.toString()); +attributesMap.put(ClientMetricsConfigs.CLIENT_ID, requestContext.clientId()); +attributesMap.put(ClientMetricsConfigs.CLIENT_SOFTWARE_NAME, requestContext.clientInformation != null ? +requestContext.clientInformation.softwareName() : null); +attributesMap.put(ClientMetricsConfigs.CLIENT_SOFTWARE_VERSION, requestContext.clientInformation != null ? +requestContext.clientInformation.softwareVersion() : null); +attributesMap.put(ClientMetricsConfigs.CLIENT_SOURCE_ADDRESS, requestContext.clientAddress != null ? +requestContext.clientAddress.getHostAddress() : null); +// KIP-714 mentions client source port should be the client connection's source port from the +// broker's point of view. But the broker does not have this information rather the port could be +// the broker's port where the client connection is established. We might want to consider removing +// the client source port from the KIP or use broker port if that can be helpful. +// TODO: fix port Review Comment: Thanks a lot @junrao this is helpful. I will make the changes in subsequent PR to address this. I have created following jira for same: https://issues.apache.org/jira/browse/KAFKA-15811 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1391639061 ## core/src/main/java/kafka/metrics/ClientMetricsConfigs.java: ## @@ -80,6 +82,11 @@ public class ClientMetricsConfigs { public static final String CLIENT_SOURCE_ADDRESS = "client_source_address"; public static final String CLIENT_SOURCE_PORT = "client_source_port"; +// Empty string in client-metrics resource configs indicates that all the metrics are subscribed. +public static final String ALL_SUBSCRIBED_METRICS_CONFIG = "\"\""; Review Comment: The KIP mentions that the response in subscriptions should be just empty string i.e. `""`. But to create `client-metrics` resource the ConfigDef parses `""` as no data hence to specify empty string through `kafka-configs.sh` we need to pass them as empty string enclosed in string i.e. `"\"\""` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1391636101 ## clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java: ## @@ -71,6 +73,31 @@ public PushTelemetryRequestData data() { return data; } +public PushTelemetryResponse createResponse(int throttleTimeMs, Errors errors) { +PushTelemetryResponseData responseData = new PushTelemetryResponseData(); +responseData.setErrorCode(errors.code()); +responseData.setThrottleTimeMs(throttleTimeMs); +return new PushTelemetryResponse(responseData); +} + +public String getMetricsContentType() { +// Future versions of PushTelemetryRequest and GetTelemetrySubscriptionsRequest may include a content-type +// field to allow for updated OTLP format versions (or additional formats), but this field is currently not +// included since only one format is specified in the current proposal of the kip-714 +return OTLP_CONTENT_TYPE; +} + +public ByteBuffer getMetricsData() { +CompressionType cType = CompressionType.forId(this.data.compressionType()); +return (cType == CompressionType.NONE) ? +ByteBuffer.wrap(this.data.metrics()) : decompressMetricsData(cType, this.data.metrics()); +} + +private static ByteBuffer decompressMetricsData(CompressionType compressionType, byte[] metrics) { +// TODO: Add support for decompression of metrics data Review Comment: Yes, I have created jira in parent KIP-714 task to address this: https://issues.apache.org/jira/browse/KAFKA-15807. I am planning to get end-to-end metrics flow without compression first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1391634085 ## clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java: ## @@ -71,6 +73,31 @@ public PushTelemetryRequestData data() { return data; } +public PushTelemetryResponse createResponse(int throttleTimeMs, Errors errors) { +PushTelemetryResponseData responseData = new PushTelemetryResponseData(); +responseData.setErrorCode(errors.code()); +responseData.setThrottleTimeMs(throttleTimeMs); Review Comment: > Could we redirect getErrorResponse to here and rename it to errorResponse? Done > If error code is not ThrottlingQuotaExceededException, we should ignore throttleTimeMs. It might be naive but sorry I didn't understand as why throttleTimeMs should not be passed in response for other exceptions. Isn't all requests goes through common throttling code where requests might be throttled for sometime based on the throughput? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
junrao commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1391496243 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server; + +import java.util.Collections; +import java.util.regex.Pattern; +import kafka.metrics.ClientMetricsConfigs; +import kafka.metrics.ClientMetricsInstance; +import kafka.metrics.ClientMetricsInstanceMetadata; +import kafka.metrics.ClientMetricsReceiverPlugin; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; +import org.apache.kafka.common.cache.SynchronizedCache; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.TelemetryTooLargeException; +import org.apache.kafka.common.errors.ThrottlingQuotaExceededException; +import org.apache.kafka.common.errors.UnknownSubscriptionIdException; +import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.requests.RequestContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.zip.CRC32; + +/** + * Handles client telemetry metrics requests/responses, subscriptions and instance information. + */ +public class ClientMetricsManager implements Closeable { + +private static final Logger log = LoggerFactory.getLogger(ClientMetricsManager.class); +private static final ClientMetricsManager INSTANCE = new ClientMetricsManager(); + +public static ClientMetricsManager instance() { +return INSTANCE; +} +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; Review Comment: The challenge is that the connections can vary depending on the type of broker host. Larger instance typically can accommodate more connections. Do we need to make this an LRU cache? If a client is terminated or idle, we should remove them from the cache. Otherwise, we probably should just rely on the existing `max.connections `to control the client connections? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
junrao commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1391490591 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable { public static ClientMetricsManager instance() { return INSTANCE; } +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; +private final Cache clientInstanceCache; +private final Map subscriptionMap; + +// The last subscription updated time is used to determine if the next telemetry request needs +// to re-evaluate the subscription id as per changes subscriptions. +private long lastSubscriptionUpdateEpoch; + +// Visible for testing +ClientMetricsManager() { +subscriptionMap = new ConcurrentHashMap<>(); +clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); +} public void updateSubscription(String subscriptionName, Properties properties) { -// TODO: Implement the update logic to manage subscriptions. +// IncrementalAlterConfigs API will send empty configs when all the configs are deleted +// for respective subscription. In that case, we need to remove the subscription from the map. +if (properties.isEmpty()) { +// Remove the subscription from the map if it exists, else ignore the config update. +if (subscriptionMap.containsKey(subscriptionName)) { +log.info("Removing subscription [{}] from the subscription map", subscriptionName); +subscriptionMap.remove(subscriptionName); +updateLastSubscriptionUpdateEpoch(); +} +return; +} + +ClientMetricsConfigs configs = new ClientMetricsConfigs(properties); +updateClientSubscription(subscriptionName, configs); +/* + Update last subscription updated time to current time to indicate that there is a change + in the subscription. This will be used to determine if the next telemetry request needs + to re-evaluate the subscription id as per changes subscriptions. +*/ +updateLastSubscriptionUpdateEpoch(); +} + +public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( +GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +long now = System.currentTimeMillis(); +Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) +.filter(id -> !id.equals(Uuid.ZERO_UUID)) +.orElse(generateNewClientId()); + +/* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issued get telemetry + request prior to push interval, then the client should get a throttle error but if the + subscription has changed since the last request then the client should get the updated + subscription immediately. +*/ +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the get request parameters for the client instance. +validateGetRequest(request, clientInstance); +} catch (ApiException exception) { +return request.getErrorResponse(throttleMs, exception); +} finally { +clientInstance.lastGetRequestEpoch(now); +} + +clientInstance.lastKnownError(Errors.NONE); +return createGetSubscriptionResponse(clientInstanceId, clientInstance, telemetryMaxBytes, throttleMs); +} + +public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, +int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +Uuid clientInstanceId = request.data().clientInstanceId(); +if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) { +String msg = String.format("Invalid request from the client [%s], invalid client instance id", +clientInstanceId); +return request.getErrorResponse(throttleMs, new InvalidRequestException(msg)); +} + +long now = System.currentTimeMillis(); +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the push request parameters for the client instance. +validatePushRequest(request, telemetryMaxBytes, clientInstance); +} catch (ApiException exception) { +clientInstance.lastKnownError(Errors.forException(exception)); +
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1389684994 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server; + +import java.util.Collections; +import java.util.regex.Pattern; +import kafka.metrics.ClientMetricsConfigs; +import kafka.metrics.ClientMetricsInstance; +import kafka.metrics.ClientMetricsInstanceMetadata; +import kafka.metrics.ClientMetricsReceiverPlugin; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; +import org.apache.kafka.common.cache.SynchronizedCache; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.TelemetryTooLargeException; +import org.apache.kafka.common.errors.ThrottlingQuotaExceededException; +import org.apache.kafka.common.errors.UnknownSubscriptionIdException; +import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.requests.RequestContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.zip.CRC32; + +/** + * Handles client telemetry metrics requests/responses, subscriptions and instance information. + */ +public class ClientMetricsManager implements Closeable { + +private static final Logger log = LoggerFactory.getLogger(ClientMetricsManager.class); +private static final ClientMetricsManager INSTANCE = new ClientMetricsManager(); + +public static ClientMetricsManager instance() { +return INSTANCE; +} +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; Review Comment: Then should it be in low 100K i.e. 2^17 = 131072? I am not sure what the right config should be here, I know the cloud providers like MSK typically provides 3000 active connections per broker where this limit is higher in case of Confluent but what typically are the active number of client connecting to broker should be. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1389719566 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable { public static ClientMetricsManager instance() { return INSTANCE; } +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; +private final Cache clientInstanceCache; +private final Map subscriptionMap; + +// The last subscription updated time is used to determine if the next telemetry request needs +// to re-evaluate the subscription id as per changes subscriptions. +private long lastSubscriptionUpdateEpoch; + +// Visible for testing +ClientMetricsManager() { +subscriptionMap = new ConcurrentHashMap<>(); +clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); +} public void updateSubscription(String subscriptionName, Properties properties) { -// TODO: Implement the update logic to manage subscriptions. +// IncrementalAlterConfigs API will send empty configs when all the configs are deleted +// for respective subscription. In that case, we need to remove the subscription from the map. +if (properties.isEmpty()) { +// Remove the subscription from the map if it exists, else ignore the config update. +if (subscriptionMap.containsKey(subscriptionName)) { +log.info("Removing subscription [{}] from the subscription map", subscriptionName); +subscriptionMap.remove(subscriptionName); +updateLastSubscriptionUpdateEpoch(); +} +return; +} + +ClientMetricsConfigs configs = new ClientMetricsConfigs(properties); +updateClientSubscription(subscriptionName, configs); +/* + Update last subscription updated time to current time to indicate that there is a change + in the subscription. This will be used to determine if the next telemetry request needs + to re-evaluate the subscription id as per changes subscriptions. +*/ +updateLastSubscriptionUpdateEpoch(); +} + +public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( +GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +long now = System.currentTimeMillis(); +Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) +.filter(id -> !id.equals(Uuid.ZERO_UUID)) +.orElse(generateNewClientId()); + +/* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issued get telemetry + request prior to push interval, then the client should get a throttle error but if the + subscription has changed since the last request then the client should get the updated + subscription immediately. +*/ +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the get request parameters for the client instance. +validateGetRequest(request, clientInstance); +} catch (ApiException exception) { +return request.getErrorResponse(throttleMs, exception); +} finally { +clientInstance.lastGetRequestEpoch(now); +} + +clientInstance.lastKnownError(Errors.NONE); +return createGetSubscriptionResponse(clientInstanceId, clientInstance, telemetryMaxBytes, throttleMs); +} + +public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, +int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +Uuid clientInstanceId = request.data().clientInstanceId(); +if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) { +String msg = String.format("Invalid request from the client [%s], invalid client instance id", +clientInstanceId); +return request.getErrorResponse(throttleMs, new InvalidRequestException(msg)); +} + +long now = System.currentTimeMillis(); +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the push request parameters for the client instance. +validatePushRequest(request, telemetryMaxBytes, clientInstance); +} catch (ApiException exception) { +clientInstance.lastKnownError(Errors.forException(exception));
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1389709078 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable { public static ClientMetricsManager instance() { return INSTANCE; } +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; +private final Cache clientInstanceCache; +private final Map subscriptionMap; + +// The last subscription updated time is used to determine if the next telemetry request needs +// to re-evaluate the subscription id as per changes subscriptions. +private long lastSubscriptionUpdateEpoch; + +// Visible for testing +ClientMetricsManager() { +subscriptionMap = new ConcurrentHashMap<>(); +clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); +} public void updateSubscription(String subscriptionName, Properties properties) { -// TODO: Implement the update logic to manage subscriptions. +// IncrementalAlterConfigs API will send empty configs when all the configs are deleted +// for respective subscription. In that case, we need to remove the subscription from the map. +if (properties.isEmpty()) { +// Remove the subscription from the map if it exists, else ignore the config update. +if (subscriptionMap.containsKey(subscriptionName)) { +log.info("Removing subscription [{}] from the subscription map", subscriptionName); +subscriptionMap.remove(subscriptionName); +updateLastSubscriptionUpdateEpoch(); +} +return; +} + +ClientMetricsConfigs configs = new ClientMetricsConfigs(properties); +updateClientSubscription(subscriptionName, configs); +/* + Update last subscription updated time to current time to indicate that there is a change + in the subscription. This will be used to determine if the next telemetry request needs + to re-evaluate the subscription id as per changes subscriptions. +*/ +updateLastSubscriptionUpdateEpoch(); +} + +public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( +GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +long now = System.currentTimeMillis(); +Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) +.filter(id -> !id.equals(Uuid.ZERO_UUID)) +.orElse(generateNewClientId()); + +/* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issued get telemetry + request prior to push interval, then the client should get a throttle error but if the + subscription has changed since the last request then the client should get the updated + subscription immediately. +*/ +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the get request parameters for the client instance. +validateGetRequest(request, clientInstance); +} catch (ApiException exception) { +return request.getErrorResponse(throttleMs, exception); +} finally { +clientInstance.lastGetRequestEpoch(now); +} + +clientInstance.lastKnownError(Errors.NONE); +return createGetSubscriptionResponse(clientInstanceId, clientInstance, telemetryMaxBytes, throttleMs); +} + +public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, +int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +Uuid clientInstanceId = request.data().clientInstanceId(); +if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) { +String msg = String.format("Invalid request from the client [%s], invalid client instance id", +clientInstanceId); +return request.getErrorResponse(throttleMs, new InvalidRequestException(msg)); +} + +long now = System.currentTimeMillis(); +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the push request parameters for the client instance. +validatePushRequest(request, telemetryMaxBytes, clientInstance); +} catch (ApiException exception) { +clientInstance.lastKnownError(Errors.forException(exception));
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1389684994 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server; + +import java.util.Collections; +import java.util.regex.Pattern; +import kafka.metrics.ClientMetricsConfigs; +import kafka.metrics.ClientMetricsInstance; +import kafka.metrics.ClientMetricsInstanceMetadata; +import kafka.metrics.ClientMetricsReceiverPlugin; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; +import org.apache.kafka.common.cache.SynchronizedCache; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.TelemetryTooLargeException; +import org.apache.kafka.common.errors.ThrottlingQuotaExceededException; +import org.apache.kafka.common.errors.UnknownSubscriptionIdException; +import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.requests.RequestContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.zip.CRC32; + +/** + * Handles client telemetry metrics requests/responses, subscriptions and instance information. + */ +public class ClientMetricsManager implements Closeable { + +private static final Logger log = LoggerFactory.getLogger(ClientMetricsManager.class); +private static final ClientMetricsManager INSTANCE = new ClientMetricsManager(); + +public static ClientMetricsManager instance() { +return INSTANCE; +} +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; Review Comment: The should it be in low 100K i.e. 2^17 = 131072? ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server; + +import java.util.Collections; +import java.util.regex.Pattern; +import kafka.metrics.ClientMetricsConfigs; +import kafka.metrics.ClientMetricsInstance; +import kafka.metrics.ClientMetricsInstanceMetadata; +import kafka.metrics.ClientMetricsReceiverPlugin; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; +import org.apache.kafka.common.cache.SynchronizedCache; +import org.apache.kafka.common.errors.ApiException; +import
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
junrao commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1387273811 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -16,12 +16,48 @@ */ package kafka.server; +import java.util.Collections; Review Comment: Should this class be in the same package as other client metric related classes like `DefaultClientTelemetryPayload`? ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable { public static ClientMetricsManager instance() { return INSTANCE; } +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; +private final Cache clientInstanceCache; +private final Map subscriptionMap; + +// The last subscription updated time is used to determine if the next telemetry request needs +// to re-evaluate the subscription id as per changes subscriptions. +private long lastSubscriptionUpdateEpoch; + +// Visible for testing +ClientMetricsManager() { +subscriptionMap = new ConcurrentHashMap<>(); +clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); +} public void updateSubscription(String subscriptionName, Properties properties) { -// TODO: Implement the update logic to manage subscriptions. +// IncrementalAlterConfigs API will send empty configs when all the configs are deleted +// for respective subscription. In that case, we need to remove the subscription from the map. +if (properties.isEmpty()) { +// Remove the subscription from the map if it exists, else ignore the config update. +if (subscriptionMap.containsKey(subscriptionName)) { +log.info("Removing subscription [{}] from the subscription map", subscriptionName); +subscriptionMap.remove(subscriptionName); +updateLastSubscriptionUpdateEpoch(); +} +return; +} + +ClientMetricsConfigs configs = new ClientMetricsConfigs(properties); +updateClientSubscription(subscriptionName, configs); +/* + Update last subscription updated time to current time to indicate that there is a change + in the subscription. This will be used to determine if the next telemetry request needs + to re-evaluate the subscription id as per changes subscriptions. +*/ +updateLastSubscriptionUpdateEpoch(); +} + +public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( +GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +long now = System.currentTimeMillis(); +Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) +.filter(id -> !id.equals(Uuid.ZERO_UUID)) +.orElse(generateNewClientId()); + +/* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issued get telemetry + request prior to push interval, then the client should get a throttle error but if the + subscription has changed since the last request then the client should get the updated + subscription immediately. +*/ +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the get request parameters for the client instance. +validateGetRequest(request, clientInstance); +} catch (ApiException exception) { +return request.getErrorResponse(throttleMs, exception); +} finally { +clientInstance.lastGetRequestEpoch(now); +} + +clientInstance.lastKnownError(Errors.NONE); +return createGetSubscriptionResponse(clientInstanceId, clientInstance, telemetryMaxBytes, throttleMs); +} + +public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, +int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +Uuid clientInstanceId = request.data().clientInstanceId(); +if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) { +String msg = String.format("Invalid request from the client [%s], invalid client instance id", +clientInstanceId); +return request.getErrorResponse(throttleMs, new InvalidRequestException(msg)); +} + +long now = System.currentTimeMillis(); +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext,
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on PR #14699: URL: https://github.com/apache/kafka/pull/14699#issuecomment-1800097217 @junrao @hachikuji @AndrewJSchofield @mjsax Please if I can get feedback on the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1382258656 ## core/src/main/java/kafka/metrics/ClientMetricsConfigs.java: ## @@ -143,46 +152,43 @@ private static void validateProperties(Properties properties) { /** * Parses the client matching patterns and builds a map with entries that has * (PatternName, PatternValue) as the entries. - * Ex: "VERSION=1.2.3" would be converted to a map entry of (Version, 1.2.3) + * Ex: "VERSION=1.2.3" would be converted to a map entry of (Version, 1.2.3) * - * NOTES: - * Client match pattern splits the input into two parts separated by first occurrence of the character '=' + * NOTES: + * Client match pattern splits the input into two parts separated by first occurrence of the character '=' * - * @param patterns List of client matching pattern strings + * @param patterns List of client matching pattern strings * @return map of client matching pattern entries */ -public static Map parseMatchingPatterns(List patterns) { -Map patternsMap = new HashMap<>(); -if (patterns != null) { -patterns.forEach(pattern -> { -String[] nameValuePair = pattern.split("="); -if (nameValuePair.length != 2) { -throw new InvalidConfigurationException("Illegal client matching pattern: " + pattern); -} - -String param = nameValuePair[0].trim(); -String patternValue = nameValuePair[1].trim(); -if (isValidParam(param) && isValidRegExPattern(patternValue)) { -patternsMap.put(param, patternValue); -} else { -throw new InvalidConfigurationException("Illegal client matching pattern: " + pattern); -} -}); +public static Map parseMatchingPatterns(List patterns) { Review Comment: Moved from to to avoid reparsing of patterns. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1382255993 ## core/src/main/java/kafka/metrics/ClientMetricsConfigs.java: ## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.common.errors.InvalidRequestException; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; + +/** + * Client metric configuration related parameters and the supporting methods like validation, etc. are + * defined in this class. + * + * { + * + *name: Name supplied by CLI during the creation of the client metric subscription. + *metrics: List of metric prefixes + *intervalMs: A positive integer value >=0 tells the client that how often a client can push the metrics + *match: List of client matching patterns, that are used by broker to match the client instance + * with the subscription. + * + * } + * + * At present, CLI can pass the following parameters in request to add/delete/update the client metrics + * subscription: + * + * "name" is a unique name for the subscription. This is used to identify the subscription in + * the broker. Ex: "METRICS-SUB" + * "metrics" value should be comma separated metrics list. A prefix match on the requested metrics + * is performed in clients to determine subscribed metrics. An empty list means no metrics subscribed. + * A list containing just an empty string means all metrics subscribed. + * Ex: "org.apache.kafka.producer.partition.queue.,org.apache.kafka.producer.partition.latency" + * + * "interval.ms" should be between 100 and 360 (1 hour). This is the interval at which the client + * should push the metrics to the broker. + * + * "match" is a comma separated list of client match patterns, in case if there is no matching + * pattern specified then broker considers that as all match which means the associated metrics + * applies to all the clients. Ex: "client_software_name = Java, client_software_version = 11.1.*" + * which means all Java clients with any sub versions of 11.1 will be matched i.e. 11.1.1, 11.1.2 etc. + * + * For more information please look at kip-714: + * https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Clientmetricsconfiguration + */ +public class ClientMetricsConfigs extends AbstractConfig { Review Comment: Resolved conflicts after merging the upstream. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1382109647 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server; + +import java.util.Collections; +import java.util.regex.Pattern; +import kafka.metrics.ClientMetricsConfigs; +import kafka.metrics.ClientMetricsInstance; +import kafka.metrics.ClientMetricsInstanceMetadata; +import kafka.metrics.ClientMetricsReceiverPlugin; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; +import org.apache.kafka.common.cache.SynchronizedCache; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.TelemetryTooLargeException; +import org.apache.kafka.common.errors.ThrottlingQuotaExceededException; +import org.apache.kafka.common.errors.UnknownSubscriptionIdException; +import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.requests.RequestContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.zip.CRC32; + +/** + * Handles client telemetry metrics requests/responses, subscriptions and instance information. + */ +public class ClientMetricsManager implements Closeable { + +private static final Logger log = LoggerFactory.getLogger(ClientMetricsManager.class); +private static final ClientMetricsManager INSTANCE = new ClientMetricsManager(); + +public static ClientMetricsManager instance() { +return INSTANCE; +} +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; +private final Cache clientInstanceCache; Review Comment: The KIP-714 says: `This client instance specific state is maintained in broker memory up to MAX(60*1000, PushIntervalMs * 3) milliseconds and is used to enforce the push interval rate-limiting. There is no persistence of client instance metrics state across broker restarts or between brokers.` I have started with LRUCache with oldest entry eviction but will implement something similar to `Selector.IdleExpiryManager` which cleansup old connections. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1382109087 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server; + +import java.util.Collections; +import java.util.regex.Pattern; +import kafka.metrics.ClientMetricsConfigs; +import kafka.metrics.ClientMetricsInstance; +import kafka.metrics.ClientMetricsInstanceMetadata; +import kafka.metrics.ClientMetricsReceiverPlugin; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; +import org.apache.kafka.common.cache.SynchronizedCache; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.TelemetryTooLargeException; +import org.apache.kafka.common.errors.ThrottlingQuotaExceededException; +import org.apache.kafka.common.errors.UnknownSubscriptionIdException; +import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.requests.RequestContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.zip.CRC32; + +/** + * Handles client telemetry metrics requests/responses, subscriptions and instance information. + */ +public class ClientMetricsManager implements Closeable { + +private static final Logger log = LoggerFactory.getLogger(ClientMetricsManager.class); +private static final ClientMetricsManager INSTANCE = new ClientMetricsManager(); + +public static ClientMetricsManager instance() { +return INSTANCE; +} +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; Review Comment: The cache should hold the information of all connected client hence it should support the number of clients that can connect to single broker. Kept it high enough for now but would require suggestions for best approach. @AndrewJSchofield mentioned that it might be the `max.connections` broker config but the configs uppoer bound is Int.MAX_VALUE hence we need to think about better upper bound here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1382107961 ## core/src/main/java/kafka/metrics/ClientMetricsConfigs.java: ## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.common.errors.InvalidRequestException; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; + +/** + * Client metric configuration related parameters and the supporting methods like validation, etc. are + * defined in this class. + * + * { + * + *name: Name supplied by CLI during the creation of the client metric subscription. + *metrics: List of metric prefixes + *intervalMs: A positive integer value >=0 tells the client that how often a client can push the metrics + *match: List of client matching patterns, that are used by broker to match the client instance + * with the subscription. + * + * } + * + * At present, CLI can pass the following parameters in request to add/delete/update the client metrics + * subscription: + * + * "name" is a unique name for the subscription. This is used to identify the subscription in + * the broker. Ex: "METRICS-SUB" + * "metrics" value should be comma separated metrics list. A prefix match on the requested metrics + * is performed in clients to determine subscribed metrics. An empty list means no metrics subscribed. + * A list containing just an empty string means all metrics subscribed. + * Ex: "org.apache.kafka.producer.partition.queue.,org.apache.kafka.producer.partition.latency" + * + * "interval.ms" should be between 100 and 360 (1 hour). This is the interval at which the client + * should push the metrics to the broker. + * + * "match" is a comma separated list of client match patterns, in case if there is no matching + * pattern specified then broker considers that as all match which means the associated metrics + * applies to all the clients. Ex: "client_software_name = Java, client_software_version = 11.1.*" + * which means all Java clients with any sub versions of 11.1 will be matched i.e. 11.1.1, 11.1.2 etc. + * + * For more information please look at kip-714: + * https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Clientmetricsconfiguration + */ +public class ClientMetricsConfigs extends AbstractConfig { Review Comment: This class is major overlap from PR where it was introduced: https://github.com/apache/kafka/pull/14621 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org