Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-29 Thread via GitHub


junrao merged PR #14699:
URL: https://github.com/apache/kafka/pull/14699


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-29 Thread via GitHub


apoorvmittal10 commented on PR #14699:
URL: https://github.com/apache/kafka/pull/14699#issuecomment-1832344298

   @junrao The tests failures are not related to the changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-29 Thread via GitHub


apoorvmittal10 commented on PR #14699:
URL: https://github.com/apache/kafka/pull/14699#issuecomment-1832108927

   Hi @junrao, I have triaged the test cases as per the latest run.
   
   
   `Existing`: Build / JDK 17 and Scala 2.13 / 
testMultiWorkerRestartOnlyConnector – 
org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest
   2m 21s: https://issues.apache.org/jira/browse/KAFKA-15675
   
   `Existing`: Build / JDK 17 and Scala 2.13 / 
testRackAwareRangeAssignor(String).quorum=zk – 
integration.kafka.server.FetchFromFollowerIntegrationTest
   39s: https://issues.apache.org/jira/browse/KAFKA-15020
   
   `Existing`: Build / JDK 17 and Scala 2.13 / testCoordinatorFailover(String, 
String).quorum=zk.groupProtocol=generic – kafka.api.SslConsumerTest
   8s: https://issues.apache.org/jira/browse/KAFKA-15920
   
   `Existing`: Build / JDK 17 and Scala 2.13 / 
testDescribeClusterRequestExcludingClusterAuthorizedOperations(String).quorum=kraft
 – kafka.server.DescribeClusterRequestTest
   3s: https://issues.apache.org/jira/browse/KAFKA-15419
   
   `Existing`: Build / JDK 17 and Scala 2.13 / testFenceMultipleBrokers() – 
org.apache.kafka.controller.QuorumControllerTest
   51s: https://issues.apache.org/jira/browse/KAFKA-15898
   
   `Existing`: Build / JDK 21 and Scala 2.13 / testSyncTopicConfigs() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest
   1m 42s: https://issues.apache.org/jira/browse/KAFKA-15523
   
   `Existing`: Build / JDK 21 and Scala 2.13 / 
testTaskRequestWithOldStartMsGetsUpdated() – 
org.apache.kafka.trogdor.coordinator.CoordinatorTest
   2m 0s: https://issues.apache.org/jira/browse/KAFKA-15760
   
   `Existing`: Build / JDK 11 and Scala 2.13 / testSyncTopicConfigs() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest
   3m 44s: https://issues.apache.org/jira/browse/KAFKA-15523
   
   `Triaged`: Build / JDK 11 and Scala 2.13 / testRestartReplication() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest
   3m 21s: https://issues.apache.org/jira/browse/KAFKA-15933
   
   `Triaged`: Build / JDK 11 and Scala 2.13 / 
testMultiConsumerStickyAssignor(String, 
String).quorum=kraft+kip848.groupProtocol=generic – 
kafka.api.PlaintextConsumerTest
   2m 12s: https://issues.apache.org/jira/browse/KAFKA-15934
   
   `Existing`: Build / JDK 8 and Scala 2.12 / testSyncTopicConfigs() – 
org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest
   4m 25s: https://issues.apache.org/jira/browse/KAFKA-14971
   
   `Triaged`: Build / JDK 8 and Scala 2.12 / testRestartReplication() – 
org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest
   2m 8s: https://issues.apache.org/jira/browse/KAFKA-15935
   
   `Existing`: Build / JDK 8 and Scala 2.12 / 
testNoCheckpointsIfNoRecordsAreMirrored() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest
   2m 59s: https://issues.apache.org/jira/browse/KAFKA-15699
   
   `Existing`: Build / JDK 8 and Scala 2.12 / testSyncTopicConfigs() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest
   3m 37s: https://issues.apache.org/jira/browse/KAFKA-15523
   
   `Existing`: Build / JDK 8 and Scala 2.12 / 
testMultiWorkerRestartOnlyConnector – 
org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest
   2m 28s: https://issues.apache.org/jira/browse/KAFKA-15761
   
   `Existing`: Build / JDK 8 and Scala 2.12 / 
testMultiWorkerRestartOnlyConnector – 
org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest
   2m 39s: https://issues.apache.org/jira/browse/KAFKA-15761
   
   `Triaged`: Build / JDK 8 and Scala 2.12 / testGetActiveTopics – 
org.apache.kafka.connect.integration.ConnectorTopicsIntegrationTest
   1m 50s: https://issues.apache.org/jira/browse/KAFKA-15936
   
   `Triaged`: Build / JDK 8 and Scala 2.12 / testTopicTrackingResetIsDisabled – 
org.apache.kafka.connect.integration.ConnectorTopicsIntegrationTest
   2m 37s: https://issues.apache.org/jira/browse/KAFKA-15936
   
   `Triaged`: Build / JDK 8 and Scala 2.12 / testTopicTrackingIsDisabled – 
org.apache.kafka.connect.integration.ConnectorTopicsIntegrationTest
   3m 4s: https://issues.apache.org/jira/browse/KAFKA-15936
   
   `Existing`: Build / JDK 8 and Scala 2.12 / testConnectorReconfiguration – 
org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest
   1m 42s: https://issues.apache.org/jira/browse/KAFKA-14901
   
   `Existing`: Build / JDK 8 and Scala 2.12 / testSeparateOffsetsTopic – 
org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest
   3m 35s: https://issues.apache.org/jira/browse/KAFKA-14089
   
   `Triaged`: Build / JDK 8 and Scala 2.12 / 
testTopicDeletion(String).quorum=kraft – kafka.admin.RemoteTopicCrudTest
   1m 30s: https://issues.apache.org/jira/browse/KAFKA-15937
   
   `Triaged`: Build / JDK 8 and Scala 2.12 / 

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-28 Thread via GitHub


apoorvmittal10 closed pull request #14699: KAFKA-15778 & KAFKA-15779: Implement 
metrics manager (KIP-714)
URL: https://github.com/apache/kafka/pull/14699


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-28 Thread via GitHub


apoorvmittal10 commented on PR #14699:
URL: https://github.com/apache/kafka/pull/14699#issuecomment-1831074267

   > @apoorvmittal10 : It seems that the build for JDK 17 and Scala 2.13 didn't 
complete.
   
   Thanks @junrao. Strange to see the failure in. `:clients:test`. I can see 
below unrelated failure in logs, retriggered build.
   
   ```
   > Task :clients:test
   org.apache.kafka.common.network.SelectorTest.testConnectionsByClientMetric() 
failed, log available in 
/home/jenkins/jenkins-agent/712657a4/workspace/Kafka_kafka-pr_PR-14699/clients/build/reports/testOutput/org.apache.kafka.common.network.SelectorTest.testConnectionsByClientMetric().test.stdout
   
   Gradle Test Run :clients:test > Gradle Test Executor 13 > SelectorTest > 
testConnectionsByClientMetric() FAILED
   java.util.concurrent.TimeoutException: testConnectionsByClientMetric() 
timed out after 240 seconds
   at 
org.junit.jupiter.engine.extension.TimeoutExceptionFactory.create(TimeoutExceptionFactory.java:29)
   at 
org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:58)
   at 
org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
   at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
   at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
   at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
   at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
   at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
   at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
   at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
   at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
   at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
   at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
   at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218)
   at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
   at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:214)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-28 Thread via GitHub


junrao commented on PR #14699:
URL: https://github.com/apache/kafka/pull/14699#issuecomment-1830813531

   @apoorvmittal10 : It seems that the build for JDK 17 and Scala 2.13 didn't 
complete.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-28 Thread via GitHub


apoorvmittal10 commented on PR #14699:
URL: https://github.com/apache/kafka/pull/14699#issuecomment-1830450426

   > Just merged #14632. Triggering another test run to make sure there are no 
new issues.
   
   Thanks @junrao.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-28 Thread via GitHub


junrao commented on PR #14699:
URL: https://github.com/apache/kafka/pull/14699#issuecomment-1830354235

   Just merged https://github.com/apache/kafka/pull/14632. Triggering another 
test run to make sure there are no new issues.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-28 Thread via GitHub


junrao closed pull request #14699: KAFKA-15778 & KAFKA-15779: Implement metrics 
manager (KIP-714)
URL: https://github.com/apache/kafka/pull/14699


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-27 Thread via GitHub


junrao commented on PR #14699:
URL: https://github.com/apache/kafka/pull/14699#issuecomment-1828329099

   @apoorvmittal10 : Thanks for triaging the tests. In the mailing list, it 
seems that we are still leaning towards requiring green builds before merging a 
PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-25 Thread via GitHub


apoorvmittal10 commented on PR #14699:
URL: https://github.com/apache/kafka/pull/14699#issuecomment-1826397165

   @junrao @AndrewJSchofield Thanks for the LGTM and approving the PR. Below is 
the test run status, none of the failing tests are related to the changes in 
the PR but I have tried to debug further. Among 20, 17 have already been 
reported as flaky test in jira, for remaining 3 I tried to locally reproduce 
but couldn't hence created `flaky test` for same. Can we merge the PR? 
   
   New failing - 20
   
   `Already existing jira for flaky test`: Build / JDK 11 and Scala 2.13 / 
testResetSinkConnectorOffsetsZombieSinkTasks – 
org.apache.kafka.connect.integration.OffsetsApiIntegrationTest
   1m 2s: https://issues.apache.org/jira/browse/KAFKA-15524
   
   `Already existing jira for flaky test`: Build / JDK 11 and Scala 2.13 / 
testRackAwareRangeAssignor(String).quorum=kraft – 
integration.kafka.server.FetchFromFollowerIntegrationTest
   8s: https://issues.apache.org/jira/browse/KAFKA-15020
   
   `Already existing jira for flaky test`: Build / JDK 11 and Scala 2.13 / 
testRackAwareRangeAssignor(String).quorum=kraft – 
integration.kafka.server.FetchFromFollowerIntegrationTest
   6s: https://issues.apache.org/jira/browse/KAFKA-15020
   
   `Already existing jira for flaky test`: Build / JDK 11 and Scala 2.13 / 
testTaskRequestWithOldStartMsGetsUpdated() – 
org.apache.kafka.trogdor.coordinator.CoordinatorTest
   2m 0s: https://issues.apache.org/jira/browse/KAFKA-15760
   
   `Already existing jira for flaky test`: Build / JDK 21 and Scala 2.13 / 
testReplicateSourceDefault() – 
org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest
   2m 0s: https://issues.apache.org/jira/browse/KAFKA-15292
   
   `Already existing jira for flaky test`: Build / JDK 21 and Scala 2.13 / 
testConsumptionWithBrokerFailures() – kafka.api.ConsumerBounceTest
   31s: https://issues.apache.org/jira/browse/KAFKA-15146
   
   `Already existing jira for flaky test`: Build / JDK 21 and Scala 2.13 / 
testDescribeClusterRequestIncludingClusterAuthorizedOperations(String).quorum=kraft
 – kafka.server.DescribeClusterRequestTest
   4s: https://issues.apache.org/jira/browse/KAFKA-15759
   
   `Already existing jira for flaky test`: Build / JDK 21 and Scala 2.13 / 
testSendOffsetsWithGroupId(String).quorum=zk – 
org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest
   1m 15s: https://issues.apache.org/jira/browse/KAFKA-15772
   
   `Created Jira, cannot reproduce locally (also exists other flaky tests from 
same class, have already ben reported by others)`: Build / JDK 21 and Scala 
2.13 / shouldQuerySpecificStalePartitionStores() – 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest
   4s: Another test in same class flaky: 
https://issues.apache.org/jira/browse/KAFKA-9897, created jira: 
https://issues.apache.org/jira/browse/KAFKA-15896
   
   `Already existing jira for flaky test`: Build / JDK 17 and Scala 2.13 / 
testReplicateSourceDefault() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest
   2m 23s: https://issues.apache.org/jira/browse/KAFKA-15699
   
   `Already existing jira for flaky test`: Build / JDK 17 and Scala 2.13 / 
testMultiWorkerRestartOnlyConnector – 
org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest
   2m 33s: https://issues.apache.org/jira/browse/KAFKA-15675
   
   `Already existing jira for flaky test`: Build / JDK 17 and Scala 2.13 / 
testDescribeTokenForOtherUserPasses(String).quorum=kraft – 
kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest
   9s: https://issues.apache.org/jira/browse/KAFKA-15411
   
   `Created flaky test jira, cannot reproduce locally`: Build / JDK 17 and 
Scala 2.13 / testWrongIncarnationId() – 
kafka.server.ControllerRegistrationManagerTest
   30s: https://issues.apache.org/jira/browse/KAFKA-15897
   
   `Created flaky test jira, cannot reproduce locally`: Build / JDK 17 and 
Scala 2.13 / testFenceMultipleBrokers() – 
org.apache.kafka.controller.QuorumControllerTest
   43s: https://issues.apache.org/jira/browse/KAFKA-15898
   
   `Already existing jira for flaky test`: Build / JDK 8 and Scala 2.12 / 
testSyncTopicConfigs() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest
   2m 33s: https://issues.apache.org/jira/browse/KAFKA-14971
   
   `Already existing jira for flaky test`: Build / JDK 8 and Scala 2.12 / [2] 
quorum=kraft, isIdempotenceEnabled=false – 
kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest
   10s: https://issues.apache.org/jira/browse/KAFKA-15411
   
   `Already existing jira for flaky test`: Build / JDK 8 and Scala 2.12 / 
testDescribeClusterRequestExcludingClusterAuthorizedOperations(String).quorum=kraft
 – kafka.server.DescribeClusterRequestTest
   4s: https://issues.apache.org/jira/browse/KAFKA-15419
   
   `Already existing jira for flaky test`: Build / JDK 8 and Scala 2.12 / 

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-24 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1404286606


##
clients/src/main/java/org/apache/kafka/common/protocol/Errors.java:
##
@@ -386,7 +388,9 @@ public enum Errors {
 STALE_MEMBER_EPOCH(113, "The member epoch is stale. The member must retry 
after receiving its updated member epoch via the ConsumerGroupHeartbeat API.", 
StaleMemberEpochException::new),
 MISMATCHED_ENDPOINT_TYPE(114, "The request was sent to an endpoint of the 
wrong type.", MismatchedEndpointTypeException::new),
 UNSUPPORTED_ENDPOINT_TYPE(115, "This endpoint type is not supported yet.", 
UnsupportedEndpointTypeException::new),
-UNKNOWN_CONTROLLER_ID(116, "This controller ID is not known.", 
UnknownControllerIdException::new);
+UNKNOWN_CONTROLLER_ID(116, "This controller ID is not known.", 
UnknownControllerIdException::new),
+UNKNOWN_SUBSCRIPTION_ID(117, "Client sent a push telemetry request with an 
invalid or outdated subscription ID", UnknownSubscriptionIdException::new),

Review Comment:
   Done, thanks for pointing out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-24 Thread via GitHub


AndrewJSchofield commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1404253943


##
clients/src/main/java/org/apache/kafka/common/protocol/Errors.java:
##
@@ -386,7 +388,9 @@ public enum Errors {
 STALE_MEMBER_EPOCH(113, "The member epoch is stale. The member must retry 
after receiving its updated member epoch via the ConsumerGroupHeartbeat API.", 
StaleMemberEpochException::new),
 MISMATCHED_ENDPOINT_TYPE(114, "The request was sent to an endpoint of the 
wrong type.", MismatchedEndpointTypeException::new),
 UNSUPPORTED_ENDPOINT_TYPE(115, "This endpoint type is not supported yet.", 
UnsupportedEndpointTypeException::new),
-UNKNOWN_CONTROLLER_ID(116, "This controller ID is not known.", 
UnknownControllerIdException::new);
+UNKNOWN_CONTROLLER_ID(116, "This controller ID is not known.", 
UnknownControllerIdException::new),
+UNKNOWN_SUBSCRIPTION_ID(117, "Client sent a push telemetry request with an 
invalid or outdated subscription ID", UnknownSubscriptionIdException::new),

Review Comment:
   Tiny change request. Please put a full stop at the end of the error strings. 
`"Client sent a push telemetry request with an invalid or outdated subscription 
ID."`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-22 Thread via GitHub


apoorvmittal10 closed pull request #14699: KAFKA-15778 & KAFKA-15779: Implement 
metrics manager (KIP-714)
URL: https://github.com/apache/kafka/pull/14699


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-22 Thread via GitHub


apoorvmittal10 commented on PR #14699:
URL: https://github.com/apache/kafka/pull/14699#issuecomment-1822604515

   > @apoorvmittal10 : Thanks for the updated PR. LGTM. Are the 34 test 
failures related?
   
   Thanks @junrao, the tests are not related and I ll trigger re-build to see 
if tests fail persists. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-20 Thread via GitHub


apoorvmittal10 commented on PR #14699:
URL: https://github.com/apache/kafka/pull/14699#issuecomment-1820280356

   Thanks @junrao for explaining the details. I have updated the PR and removed 
throttleMs from ClientMetricsManager. I have added a Jira to add respective 
throttling changes in QuotaManager.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-20 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1400049044


##
core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala:
##
@@ -103,9 +103,16 @@ class DynamicConfigPublisher(
   )
 case CLIENT_METRICS =>
   // Apply changes to client metrics subscription.
-  info(s"Updating client metrics subscription ${resource.name()} 
with new configuration : " +
-toLoggableProps(resource, props).mkString(","))
-  
dynamicConfigHandlers(ConfigType.ClientMetrics).processConfigChanges(resource.name(),
 props)
+  
dynamicConfigHandlers.get(ConfigType.ClientMetrics).foreach(metricsConfigHandler
 =>
+try {
+  info(s"Updating client metrics ${resource.name()} with new 
configuration : " +
+toLoggableProps(resource, props).mkString(","))

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-20 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1400048884


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -16,31 +16,421 @@
  */
 package kafka.server;
 
+import kafka.metrics.ClientMetricsConfigs;
+import kafka.metrics.ClientMetricsInstance;
+import kafka.metrics.ClientMetricsInstanceMetadata;
+import kafka.metrics.ClientMetricsReceiverPlugin;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.TelemetryTooLargeException;
+import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
+import org.apache.kafka.common.errors.UnknownSubscriptionIdException;
+import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.Crc32C;
+import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
 
 /**
  * Handles client telemetry metrics requests/responses, subscriptions and 
instance information.
  */
 public class ClientMetricsManager implements Closeable {
 
 private static final Logger log = 
LoggerFactory.getLogger(ClientMetricsManager.class);
-private static final ClientMetricsManager INSTANCE = new 
ClientMetricsManager();
+private static final List SUPPORTED_COMPRESSION_TYPES = 
Collections.unmodifiableList(
+Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, 
CompressionType.GZIP.id, CompressionType.SNAPPY.id));
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+private final KafkaConfig config;
+private final Time time;
+
+// The latest subscription version is used to determine if subscription 
has changed and needs
+// to re-evaluate the client instance subscription id as per changed 
subscriptions.
+private final AtomicInteger subscriptionUpdateVersion;
 
-public static ClientMetricsManager instance() {
-return INSTANCE;
+public ClientMetricsManager(KafkaConfig config, Time time) {
+this.subscriptionMap = new ConcurrentHashMap<>();
+this.subscriptionUpdateVersion = new AtomicInteger(0);
+this.clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+this.config = config;
+this.time = time;
 }
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// Validate the subscription properties.
+ClientMetricsConfigs.validate(subscriptionName, properties);
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+this.subscriptionUpdateVersion.incrementAndGet();
+}
+return;
+}
+
+updateClientSubscription(subscriptionName, new 
ClientMetricsConfigs(properties));
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-20 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1400045702


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -16,31 +16,420 @@
  */
 package kafka.server;
 
+import kafka.metrics.ClientMetricsConfigs;

Review Comment:
   As I received merge conflict in the current PR hence merged the upstream 
changes which required to move the classes to write package. I have done this 
in current PR itself.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-20 Thread via GitHub


junrao commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1399714279


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -16,31 +16,421 @@
  */
 package kafka.server;
 
+import kafka.metrics.ClientMetricsConfigs;
+import kafka.metrics.ClientMetricsInstance;
+import kafka.metrics.ClientMetricsInstanceMetadata;
+import kafka.metrics.ClientMetricsReceiverPlugin;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.TelemetryTooLargeException;
+import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
+import org.apache.kafka.common.errors.UnknownSubscriptionIdException;
+import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.Crc32C;
+import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
 
 /**
  * Handles client telemetry metrics requests/responses, subscriptions and 
instance information.
  */
 public class ClientMetricsManager implements Closeable {
 
 private static final Logger log = 
LoggerFactory.getLogger(ClientMetricsManager.class);
-private static final ClientMetricsManager INSTANCE = new 
ClientMetricsManager();
+private static final List SUPPORTED_COMPRESSION_TYPES = 
Collections.unmodifiableList(
+Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, 
CompressionType.GZIP.id, CompressionType.SNAPPY.id));
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+private final KafkaConfig config;
+private final Time time;
+
+// The latest subscription version is used to determine if subscription 
has changed and needs
+// to re-evaluate the client instance subscription id as per changed 
subscriptions.
+private final AtomicInteger subscriptionUpdateVersion;
 
-public static ClientMetricsManager instance() {
-return INSTANCE;
+public ClientMetricsManager(KafkaConfig config, Time time) {
+this.subscriptionMap = new ConcurrentHashMap<>();
+this.subscriptionUpdateVersion = new AtomicInteger(0);
+this.clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+this.config = config;
+this.time = time;
 }
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// Validate the subscription properties.
+ClientMetricsConfigs.validate(subscriptionName, properties);
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+this.subscriptionUpdateVersion.incrementAndGet();
+}
+return;
+}
+
+updateClientSubscription(subscriptionName, new 
ClientMetricsConfigs(properties));
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsRequest 

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-19 Thread via GitHub


apoorvmittal10 commented on PR #14699:
URL: https://github.com/apache/kafka/pull/14699#issuecomment-1817955171

   Thanks for the review @junrao. I have updated the PR. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-19 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1398480299


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable {
 public static ClientMetricsManager instance() {
 return INSTANCE;
 }
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+
+// The last subscription updated time is used to determine if the next 
telemetry request needs
+// to re-evaluate the subscription id as per changes subscriptions.
+private long lastSubscriptionUpdateEpoch;
+
+// Visible for testing
+ClientMetricsManager() {
+subscriptionMap = new ConcurrentHashMap<>();
+clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+}
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+updateLastSubscriptionUpdateEpoch();
+}
+return;
+}
+
+ClientMetricsConfigs configs = new ClientMetricsConfigs(properties);
+updateClientSubscription(subscriptionName, configs);
+/*
+ Update last subscription updated time to current time to indicate 
that there is a change
+ in the subscription. This will be used to determine if the next 
telemetry request needs
+ to re-evaluate the subscription id as per changes subscriptions.
+*/
+updateLastSubscriptionUpdateEpoch();
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+long now = System.currentTimeMillis();
+Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+.filter(id -> !id.equals(Uuid.ZERO_UUID))
+.orElse(generateNewClientId());
+
+/*
+ Get the client instance from the cache or create a new one. If 
subscription has changed
+ since the last request, then the client instance will be 
re-evaluated. Validation of the
+ request will be done after the client instance is created. If client 
issued get telemetry
+ request prior to push interval, then the client should get a throttle 
error but if the
+ subscription has changed since the last request then the client 
should get the updated
+ subscription immediately.
+*/
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the get request parameters for the client instance.
+validateGetRequest(request, clientInstance);
+} catch (ApiException exception) {
+return request.getErrorResponse(throttleMs, exception);
+} finally {
+clientInstance.lastGetRequestEpoch(now);
+}
+
+clientInstance.lastKnownError(Errors.NONE);
+return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+}
+
+public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+Uuid clientInstanceId = request.data().clientInstanceId();
+if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) {
+String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+clientInstanceId);
+return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));
+}
+
+long now = System.currentTimeMillis();
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the push request parameters for the client instance.
+validatePushRequest(request, telemetryMaxBytes, clientInstance);
+} catch (ApiException exception) {
+clientInstance.lastKnownError(Errors.forException(exception));

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-19 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1398480068


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -30,17 +69,376 @@ public class ClientMetricsManager implements Closeable {
 
 private static final Logger log = 
LoggerFactory.getLogger(ClientMetricsManager.class);
 private static final ClientMetricsManager INSTANCE = new 
ClientMetricsManager();
+private static final List SUPPORTED_COMPRESSION_TYPES = 
Collections.unmodifiableList(
+Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, 
CompressionType.GZIP.id, CompressionType.SNAPPY.id));
 
 public static ClientMetricsManager instance() {
 return INSTANCE;
 }
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+private final Time time;
+
+// The latest subscription version is used to determine if subscription 
has changed and needs
+// to re-evaluate the client instance subscription id as per changed 
subscriptions.
+private final AtomicInteger subscriptionUpdateVersion;
+
+private ClientMetricsManager() {
+this(Time.SYSTEM);
+}
+
+// Visible for testing
+ClientMetricsManager(Time time) {
+this.subscriptionMap = new ConcurrentHashMap<>();
+this.subscriptionUpdateVersion = new AtomicInteger(0);
+this.clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+this.time = time;
+}
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// Validate the subscription properties.
+ClientMetricsConfigs.validate(subscriptionName, properties);
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+this.subscriptionUpdateVersion.incrementAndGet();
+}
+return;
+}
+
+updateClientSubscription(subscriptionName, new 
ClientMetricsConfigs(properties));
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+long now = time.milliseconds();
+Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+.filter(id -> !id.equals(Uuid.ZERO_UUID))
+.orElse(generateNewClientId());
+
+/*
+ Get the client instance from the cache or create a new one. If 
subscription has changed
+ since the last request, then the client instance will be 
re-evaluated. Validation of the
+ request will be done after the client instance is created. If client 
issues another get
+ telemetry request prior to push interval, then the client should get 
a throttle error but if
+ the subscription has changed since the last request then the client 
should get the updated
+ subscription immediately.
+*/
+ClientMetricsInstance clientInstance = 
clientInstance(clientInstanceId, requestContext);
+
+try {
+// Validate the get request parameters for the client instance.
+validateGetRequest(request, clientInstance, now);
+} catch (ApiException exception) {
+return request.getErrorResponse(throttleMs, exception);
+}
+
+clientInstance.lastKnownError(Errors.NONE);
+return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+}
+
+public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+Uuid clientInstanceId = request.data().clientInstanceId();
+if (clientInstanceId == null || 
Uuid.RESERVED.contains(clientInstanceId)) {
+String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+clientInstanceId);
+return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));
+}
+
+long now = time.milliseconds();
+ClientMetricsInstance clientInstance = 
clientInstance(clientInstanceId, 

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-19 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1398479956


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -30,17 +69,376 @@ public class ClientMetricsManager implements Closeable {
 
 private static final Logger log = 
LoggerFactory.getLogger(ClientMetricsManager.class);
 private static final ClientMetricsManager INSTANCE = new 
ClientMetricsManager();
+private static final List SUPPORTED_COMPRESSION_TYPES = 
Collections.unmodifiableList(
+Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, 
CompressionType.GZIP.id, CompressionType.SNAPPY.id));
 
 public static ClientMetricsManager instance() {
 return INSTANCE;
 }
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+private final Time time;
+
+// The latest subscription version is used to determine if subscription 
has changed and needs
+// to re-evaluate the client instance subscription id as per changed 
subscriptions.
+private final AtomicInteger subscriptionUpdateVersion;
+
+private ClientMetricsManager() {
+this(Time.SYSTEM);
+}
+
+// Visible for testing
+ClientMetricsManager(Time time) {
+this.subscriptionMap = new ConcurrentHashMap<>();
+this.subscriptionUpdateVersion = new AtomicInteger(0);
+this.clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+this.time = time;
+}
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// Validate the subscription properties.
+ClientMetricsConfigs.validate(subscriptionName, properties);
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+this.subscriptionUpdateVersion.incrementAndGet();
+}
+return;
+}
+
+updateClientSubscription(subscriptionName, new 
ClientMetricsConfigs(properties));
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+long now = time.milliseconds();
+Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+.filter(id -> !id.equals(Uuid.ZERO_UUID))
+.orElse(generateNewClientId());
+
+/*
+ Get the client instance from the cache or create a new one. If 
subscription has changed
+ since the last request, then the client instance will be 
re-evaluated. Validation of the
+ request will be done after the client instance is created. If client 
issues another get
+ telemetry request prior to push interval, then the client should get 
a throttle error but if
+ the subscription has changed since the last request then the client 
should get the updated
+ subscription immediately.
+*/
+ClientMetricsInstance clientInstance = 
clientInstance(clientInstanceId, requestContext);
+
+try {
+// Validate the get request parameters for the client instance.
+validateGetRequest(request, clientInstance, now);
+} catch (ApiException exception) {
+return request.getErrorResponse(throttleMs, exception);
+}
+
+clientInstance.lastKnownError(Errors.NONE);
+return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+}
+
+public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+Uuid clientInstanceId = request.data().clientInstanceId();
+if (clientInstanceId == null || 
Uuid.RESERVED.contains(clientInstanceId)) {
+String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+clientInstanceId);
+return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));
+}
+
+long now = time.milliseconds();
+ClientMetricsInstance clientInstance = 
clientInstance(clientInstanceId, 

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-19 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1398479915


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -30,17 +69,376 @@ public class ClientMetricsManager implements Closeable {
 
 private static final Logger log = 
LoggerFactory.getLogger(ClientMetricsManager.class);
 private static final ClientMetricsManager INSTANCE = new 
ClientMetricsManager();
+private static final List SUPPORTED_COMPRESSION_TYPES = 
Collections.unmodifiableList(
+Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, 
CompressionType.GZIP.id, CompressionType.SNAPPY.id));
 
 public static ClientMetricsManager instance() {
 return INSTANCE;
 }
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+private final Time time;
+
+// The latest subscription version is used to determine if subscription 
has changed and needs
+// to re-evaluate the client instance subscription id as per changed 
subscriptions.
+private final AtomicInteger subscriptionUpdateVersion;
+
+private ClientMetricsManager() {
+this(Time.SYSTEM);
+}
+
+// Visible for testing
+ClientMetricsManager(Time time) {
+this.subscriptionMap = new ConcurrentHashMap<>();
+this.subscriptionUpdateVersion = new AtomicInteger(0);
+this.clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+this.time = time;
+}
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// Validate the subscription properties.
+ClientMetricsConfigs.validate(subscriptionName, properties);
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+this.subscriptionUpdateVersion.incrementAndGet();
+}
+return;
+}
+
+updateClientSubscription(subscriptionName, new 
ClientMetricsConfigs(properties));
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+long now = time.milliseconds();
+Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+.filter(id -> !id.equals(Uuid.ZERO_UUID))
+.orElse(generateNewClientId());
+
+/*
+ Get the client instance from the cache or create a new one. If 
subscription has changed
+ since the last request, then the client instance will be 
re-evaluated. Validation of the
+ request will be done after the client instance is created. If client 
issues another get
+ telemetry request prior to push interval, then the client should get 
a throttle error but if
+ the subscription has changed since the last request then the client 
should get the updated
+ subscription immediately.
+*/
+ClientMetricsInstance clientInstance = 
clientInstance(clientInstanceId, requestContext);
+
+try {
+// Validate the get request parameters for the client instance.
+validateGetRequest(request, clientInstance, now);
+} catch (ApiException exception) {
+return request.getErrorResponse(throttleMs, exception);
+}
+
+clientInstance.lastKnownError(Errors.NONE);
+return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+}
+
+public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+Uuid clientInstanceId = request.data().clientInstanceId();
+if (clientInstanceId == null || 
Uuid.RESERVED.contains(clientInstanceId)) {
+String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+clientInstanceId);
+return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));
+}
+
+long now = time.milliseconds();
+ClientMetricsInstance clientInstance = 
clientInstance(clientInstanceId, 

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-19 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1398479399


##
core/src/main/java/kafka/metrics/ClientMetricsInstance.java:
##
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Contains the metrics instance metadata and the state of the client instance.
+ */
+public class ClientMetricsInstance {
+
+private final Uuid clientInstanceId;
+private final ClientMetricsInstanceMetadata instanceMetadata;
+private final int subscriptionId;
+private final int subscriptionVersion;
+private final Set metrics;
+private final int pushIntervalMs;
+
+private long lastGetRequestEpoch;

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-19 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1398479369


##
core/src/test/java/kafka/server/ClientMetricsManagerTest.java:
##
@@ -0,0 +1,951 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server;
+
+import kafka.metrics.ClientMetricsConfigs;
+import kafka.metrics.ClientMetricsInstance;
+import kafka.metrics.ClientMetricsTestUtils;
+import kafka.server.ClientMetricsManager.SubscriptionInfo;
+import kafka.utils.TestUtils;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryRequest.Builder;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.utils.MockTime;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientMetricsManagerTest {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(ClientMetricsManagerTest.class);
+
+private Properties props;
+private KafkaConfig config;
+private MockTime time;
+private ClientMetricsManager clientMetricsManager;
+
+@BeforeEach
+public void setUp() {
+props = TestUtils.createDummyBrokerConfig();
+props.setProperty(KafkaConfig.ClientTelemetryMaxBytesProp(), "100");
+config = new KafkaConfig(props);
+time = new MockTime();
+clientMetricsManager = new ClientMetricsManager(config, time);
+}
+
+@Test
+public void testUpdateSubscription() {
+assertTrue(clientMetricsManager.subscriptions().isEmpty());
+
+assertEquals(0, clientMetricsManager.subscriptionUpdateVersion());
+clientMetricsManager.updateSubscription("sub-1", 
ClientMetricsTestUtils.defaultProperties());
+
+assertEquals(1, clientMetricsManager.subscriptions().size());
+assertNotNull(clientMetricsManager.subscriptionInfo("sub-1"));
+
+SubscriptionInfo subscriptionInfo = 
clientMetricsManager.subscriptionInfo("sub-1");
+Set metrics = subscriptionInfo.metrics();
+
+// Validate metrics.
+assertEquals(ClientMetricsTestUtils.DEFAULT_METRICS.split(",").length, 
metrics.size());
+
Arrays.stream(ClientMetricsTestUtils.DEFAULT_METRICS.split(",")).forEach(metric 
->
+assertTrue(metrics.contains(metric)));
+// Validate push interval.
+
assertEquals(ClientMetricsTestUtils.defaultProperties().getProperty(ClientMetricsConfigs.PUSH_INTERVAL_MS),
+String.valueOf(subscriptionInfo.intervalMs()));
+
+// Validate match patterns.
+
assertEquals(ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.size(),
+subscriptionInfo.matchPattern().size());
+ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.forEach(pattern 
-> {
+String[] split = pattern.split("=");
+assertTrue(subscriptionInfo.matchPattern().containsKey(split[0]));
+assertEquals(split[1], 

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-19 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1398479353


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -16,31 +16,421 @@
  */
 package kafka.server;
 
+import kafka.metrics.ClientMetricsConfigs;
+import kafka.metrics.ClientMetricsInstance;
+import kafka.metrics.ClientMetricsInstanceMetadata;
+import kafka.metrics.ClientMetricsReceiverPlugin;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.TelemetryTooLargeException;
+import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
+import org.apache.kafka.common.errors.UnknownSubscriptionIdException;
+import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.Crc32C;
+import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
 
 /**
  * Handles client telemetry metrics requests/responses, subscriptions and 
instance information.
  */
 public class ClientMetricsManager implements Closeable {
 
 private static final Logger log = 
LoggerFactory.getLogger(ClientMetricsManager.class);
-private static final ClientMetricsManager INSTANCE = new 
ClientMetricsManager();
+private static final List SUPPORTED_COMPRESSION_TYPES = 
Collections.unmodifiableList(
+Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, 
CompressionType.GZIP.id, CompressionType.SNAPPY.id));
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+private final KafkaConfig config;
+private final Time time;
+
+// The latest subscription version is used to determine if subscription 
has changed and needs
+// to re-evaluate the client instance subscription id as per changed 
subscriptions.
+private final AtomicInteger subscriptionUpdateVersion;
 
-public static ClientMetricsManager instance() {
-return INSTANCE;
+public ClientMetricsManager(KafkaConfig config, Time time) {
+this.subscriptionMap = new ConcurrentHashMap<>();
+this.subscriptionUpdateVersion = new AtomicInteger(0);
+this.clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+this.config = config;
+this.time = time;
 }
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// Validate the subscription properties.
+ClientMetricsConfigs.validate(subscriptionName, properties);
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+this.subscriptionUpdateVersion.incrementAndGet();
+}
+return;
+}
+
+updateClientSubscription(subscriptionName, new 
ClientMetricsConfigs(properties));
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-19 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1398479319


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -16,31 +16,421 @@
  */
 package kafka.server;
 
+import kafka.metrics.ClientMetricsConfigs;
+import kafka.metrics.ClientMetricsInstance;
+import kafka.metrics.ClientMetricsInstanceMetadata;
+import kafka.metrics.ClientMetricsReceiverPlugin;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.TelemetryTooLargeException;
+import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
+import org.apache.kafka.common.errors.UnknownSubscriptionIdException;
+import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.Crc32C;
+import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
 
 /**
  * Handles client telemetry metrics requests/responses, subscriptions and 
instance information.
  */
 public class ClientMetricsManager implements Closeable {
 
 private static final Logger log = 
LoggerFactory.getLogger(ClientMetricsManager.class);
-private static final ClientMetricsManager INSTANCE = new 
ClientMetricsManager();
+private static final List SUPPORTED_COMPRESSION_TYPES = 
Collections.unmodifiableList(
+Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, 
CompressionType.GZIP.id, CompressionType.SNAPPY.id));
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+private final KafkaConfig config;
+private final Time time;
+
+// The latest subscription version is used to determine if subscription 
has changed and needs
+// to re-evaluate the client instance subscription id as per changed 
subscriptions.
+private final AtomicInteger subscriptionUpdateVersion;
 
-public static ClientMetricsManager instance() {
-return INSTANCE;
+public ClientMetricsManager(KafkaConfig config, Time time) {
+this.subscriptionMap = new ConcurrentHashMap<>();
+this.subscriptionUpdateVersion = new AtomicInteger(0);
+this.clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+this.config = config;
+this.time = time;
 }
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// Validate the subscription properties.
+ClientMetricsConfigs.validate(subscriptionName, properties);
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+this.subscriptionUpdateVersion.incrementAndGet();
+}
+return;
+}
+
+updateClientSubscription(subscriptionName, new 
ClientMetricsConfigs(properties));
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-19 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1398477241


##
core/src/main/java/kafka/metrics/ClientMetricsReceiverPlugin.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.server.telemetry.ClientTelemetryReceiver;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Plugin to register client telemetry receivers and export metrics. This 
class is used by the Kafka
+ * server to export client metrics to the registered receivers.
+ */
+public class ClientMetricsReceiverPlugin {
+
+private static final ClientMetricsReceiverPlugin INSTANCE = new 
ClientMetricsReceiverPlugin();

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-17 Thread via GitHub


junrao commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1397751986


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -30,17 +69,376 @@ public class ClientMetricsManager implements Closeable {
 
 private static final Logger log = 
LoggerFactory.getLogger(ClientMetricsManager.class);
 private static final ClientMetricsManager INSTANCE = new 
ClientMetricsManager();
+private static final List SUPPORTED_COMPRESSION_TYPES = 
Collections.unmodifiableList(
+Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, 
CompressionType.GZIP.id, CompressionType.SNAPPY.id));
 
 public static ClientMetricsManager instance() {
 return INSTANCE;
 }
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+private final Time time;
+
+// The latest subscription version is used to determine if subscription 
has changed and needs
+// to re-evaluate the client instance subscription id as per changed 
subscriptions.
+private final AtomicInteger subscriptionUpdateVersion;
+
+private ClientMetricsManager() {
+this(Time.SYSTEM);
+}
+
+// Visible for testing
+ClientMetricsManager(Time time) {
+this.subscriptionMap = new ConcurrentHashMap<>();
+this.subscriptionUpdateVersion = new AtomicInteger(0);
+this.clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+this.time = time;
+}
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// Validate the subscription properties.
+ClientMetricsConfigs.validate(subscriptionName, properties);
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+this.subscriptionUpdateVersion.incrementAndGet();
+}
+return;
+}
+
+updateClientSubscription(subscriptionName, new 
ClientMetricsConfigs(properties));
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+long now = time.milliseconds();
+Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+.filter(id -> !id.equals(Uuid.ZERO_UUID))
+.orElse(generateNewClientId());
+
+/*
+ Get the client instance from the cache or create a new one. If 
subscription has changed
+ since the last request, then the client instance will be 
re-evaluated. Validation of the
+ request will be done after the client instance is created. If client 
issues another get
+ telemetry request prior to push interval, then the client should get 
a throttle error but if
+ the subscription has changed since the last request then the client 
should get the updated
+ subscription immediately.
+*/
+ClientMetricsInstance clientInstance = 
clientInstance(clientInstanceId, requestContext);
+
+try {
+// Validate the get request parameters for the client instance.
+validateGetRequest(request, clientInstance, now);
+} catch (ApiException exception) {
+return request.getErrorResponse(throttleMs, exception);
+}
+
+clientInstance.lastKnownError(Errors.NONE);
+return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+}
+
+public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+Uuid clientInstanceId = request.data().clientInstanceId();
+if (clientInstanceId == null || 
Uuid.RESERVED.contains(clientInstanceId)) {
+String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+clientInstanceId);
+return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));
+}
+
+long now = time.milliseconds();
+ClientMetricsInstance clientInstance = 
clientInstance(clientInstanceId, 

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-17 Thread via GitHub


apoorvmittal10 commented on PR #14699:
URL: https://github.com/apache/kafka/pull/14699#issuecomment-1817091571

   Thanks @junrao for leaving the comments, I have tried to address them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-17 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1397875576


##
core/src/test/java/kafka/metrics/ClientMetricsInstanceTest.java:
##
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.net.UnknownHostException;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientMetricsInstanceTest {
+
+private Uuid uuid;
+private ClientMetricsInstanceMetadata instanceMetadata;
+private ClientMetricsInstance clientInstance;
+
+@BeforeEach
+public void setUp() throws UnknownHostException {
+uuid = Uuid.randomUuid();
+instanceMetadata = new ClientMetricsInstanceMetadata(uuid,
+ClientMetricsTestUtils.requestContext());
+clientInstance = new ClientMetricsInstance(Uuid.randomUuid(), 
instanceMetadata, 0, 0,
+null, ClientMetricsConfigs.DEFAULT_INTERVAL_MS);
+}
+
+@Test
+public void testMaybeUpdateRequestEpochValid() {
+// First request should be accepted.
+
assertTrue(clientInstance.maybeUpdateGetRequestEpoch(System.currentTimeMillis()));

Review Comment:
   As the class/methods I am testing do not have `Time` reference rather just 
tries to updates the epoch supplied hence I didn't see any value in using 
MockTime. However I did change the code and tests for ClientMetricsManager 
where these methods are invoked from, I started using Time in 
ClientMetricsManager and corresponding MockTime in ClientMetricsManagerTest 
which eliminated the use of `Thread.sleep`.
   
   Please let me know if I am missing anything here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-17 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1397873419


##
core/src/main/java/kafka/metrics/ClientMetricsReceiverPlugin.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.server.telemetry.ClientTelemetryReceiver;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Plugin to register client telemetry receivers and export metrics. This 
class is used by the Kafka
+ * server to export client metrics to the registered receivers.
+ */
+public class ClientMetricsReceiverPlugin {
+
+private static final ClientMetricsReceiverPlugin INSTANCE = new 
ClientMetricsReceiverPlugin();

Review Comment:
   I have moved the `ClientMetricsManager` static instance and removed the code 
from ControllerServer.scala, now initialization only happens in 
`BrokerServer.scala`. Please let me know your thought for 
`ClientMetricsReceiverPlugin`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-17 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1397871398


##
clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java:
##
@@ -60,17 +62,47 @@ public PushTelemetryRequest(PushTelemetryRequestData data, 
short version) {
 
 @Override
 public PushTelemetryResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
-PushTelemetryResponseData responseData = new 
PushTelemetryResponseData()
-.setErrorCode(Errors.forException(e).code())
-.setThrottleTimeMs(throttleTimeMs);
-return new PushTelemetryResponse(responseData);
+return errorResponse(throttleTimeMs, Errors.forException(e));
 }
 
 @Override
 public PushTelemetryRequestData data() {
 return data;
 }
 
+public PushTelemetryResponse errorResponse(int throttleTimeMs, Errors 
errors) {
+PushTelemetryResponseData responseData = new 
PushTelemetryResponseData();
+responseData.setErrorCode(errors.code());
+/*
+ THROTTLING_QUOTA_EXCEEDED is thrown in telemetry APIs when the 
telemetry request
+ arrives prior the configured minimum interval between two consecutive 
telemetry requests.
+ In this case, the throttleTimeMs should not be included in the 
response to avoid the
+ muting of the client channel for all the other requests.
+*/
+if (Errors.THROTTLING_QUOTA_EXCEEDED != errors) {

Review Comment:
   I have removed the conditional llogic in this class and set `0` as the 
throttle time in case of error in ClientMetricsManager. Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-17 Thread via GitHub


junrao commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1397714746


##
clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java:
##
@@ -60,17 +62,47 @@ public PushTelemetryRequest(PushTelemetryRequestData data, 
short version) {
 
 @Override
 public PushTelemetryResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
-PushTelemetryResponseData responseData = new 
PushTelemetryResponseData()
-.setErrorCode(Errors.forException(e).code())
-.setThrottleTimeMs(throttleTimeMs);
-return new PushTelemetryResponse(responseData);
+return errorResponse(throttleTimeMs, Errors.forException(e));
 }
 
 @Override
 public PushTelemetryRequestData data() {
 return data;
 }
 
+public PushTelemetryResponse errorResponse(int throttleTimeMs, Errors 
errors) {
+PushTelemetryResponseData responseData = new 
PushTelemetryResponseData();
+responseData.setErrorCode(errors.code());
+/*
+ THROTTLING_QUOTA_EXCEEDED is thrown in telemetry APIs when the 
telemetry request
+ arrives prior the configured minimum interval between two consecutive 
telemetry requests.
+ In this case, the throttleTimeMs should not be included in the 
response to avoid the
+ muting of the client channel for all the other requests.
+*/
+if (Errors.THROTTLING_QUOTA_EXCEEDED != errors) {

Review Comment:
   I noticed that sometimes we do set `throttleTimeMs` with an error. However, 
this only happens in KafkaApis with the generic request throttling logic. So, 
for now, I'd recommend that we get rid of this check here and rely on the 
caller to set `throttleTimeMs` properly. Specifically, for all 
`errorResponse(int throttleTimeMs, Errors errors)` and `getErrorResponse(int 
throttleTimeMs, Throwable e)` calls in `ClientMetricsManager`, we should pass 
in 0 for `throttleTimeMs`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-17 Thread via GitHub


junrao commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1397714746


##
clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java:
##
@@ -60,17 +62,47 @@ public PushTelemetryRequest(PushTelemetryRequestData data, 
short version) {
 
 @Override
 public PushTelemetryResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
-PushTelemetryResponseData responseData = new 
PushTelemetryResponseData()
-.setErrorCode(Errors.forException(e).code())
-.setThrottleTimeMs(throttleTimeMs);
-return new PushTelemetryResponse(responseData);
+return errorResponse(throttleTimeMs, Errors.forException(e));
 }
 
 @Override
 public PushTelemetryRequestData data() {
 return data;
 }
 
+public PushTelemetryResponse errorResponse(int throttleTimeMs, Errors 
errors) {
+PushTelemetryResponseData responseData = new 
PushTelemetryResponseData();
+responseData.setErrorCode(errors.code());
+/*
+ THROTTLING_QUOTA_EXCEEDED is thrown in telemetry APIs when the 
telemetry request
+ arrives prior the configured minimum interval between two consecutive 
telemetry requests.
+ In this case, the throttleTimeMs should not be included in the 
response to avoid the
+ muting of the client channel for all the other requests.
+*/
+if (Errors.THROTTLING_QUOTA_EXCEEDED != errors) {

Review Comment:
   I noticed that sometimes we do set `throttleTimeMs` with an error. However, 
this only happens in KafkaApis with the generic request throttling logic. So, 
for now, I'd recommend that we get rid of this check here and rely on the 
caller to set `throttleTimeMs` properly. Specifically, for all 
`errorResponse(int throttleTimeMs, Errors errors) `and `getErrorResponse(int 
throttleTimeMs, Throwable e)` calls in `ClientMetricsManager`, we should pass 
in 0 for `throttleTimeMs`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-17 Thread via GitHub


junrao commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1397714746


##
clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java:
##
@@ -60,17 +62,47 @@ public PushTelemetryRequest(PushTelemetryRequestData data, 
short version) {
 
 @Override
 public PushTelemetryResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
-PushTelemetryResponseData responseData = new 
PushTelemetryResponseData()
-.setErrorCode(Errors.forException(e).code())
-.setThrottleTimeMs(throttleTimeMs);
-return new PushTelemetryResponse(responseData);
+return errorResponse(throttleTimeMs, Errors.forException(e));
 }
 
 @Override
 public PushTelemetryRequestData data() {
 return data;
 }
 
+public PushTelemetryResponse errorResponse(int throttleTimeMs, Errors 
errors) {
+PushTelemetryResponseData responseData = new 
PushTelemetryResponseData();
+responseData.setErrorCode(errors.code());
+/*
+ THROTTLING_QUOTA_EXCEEDED is thrown in telemetry APIs when the 
telemetry request
+ arrives prior the configured minimum interval between two consecutive 
telemetry requests.
+ In this case, the throttleTimeMs should not be included in the 
response to avoid the
+ muting of the client channel for all the other requests.
+*/
+if (Errors.THROTTLING_QUOTA_EXCEEDED != errors) {

Review Comment:
   As discussed earlier, for `THROTTLING_QUOTA_EXCEEDED`, we shouldn't set  
`throttleTimeMs` since we don't want the client to mute the channel for all 
requests.
   
   Also, I noticed that sometimes we do set `throttleTimeMs` with an error. 
However, this only happens in KafkaApis with the generic request throttling 
logic. So, for now, I'd recommend that we get rid of this check here and rely 
on the caller to set `throttleTimeMs` properly. For example, for all 
`errorResponse(int throttleTimeMs, Errors errors) `and `getErrorResponse(int 
throttleTimeMs, Throwable e)` calls in `ClientMetricsManager`, we should pass 
in 0 for `throttleTimeMs`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-17 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1397795152


##
core/src/main/java/kafka/metrics/ClientMetricsReceiverPlugin.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.server.telemetry.ClientTelemetryReceiver;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Plugin to register client telemetry receivers and export metrics. This 
class is used by the Kafka
+ * server to export client metrics to the registered receivers.
+ */
+public class ClientMetricsReceiverPlugin {
+
+private static final ClientMetricsReceiverPlugin INSTANCE = new 
ClientMetricsReceiverPlugin();

Review Comment:
   Yeah, I re-checked the code today and we shouldn't require any change in 
ControllerServer.scala as the APIs are not supported on controller and we won't 
require even KIP-1000 to be supported on controllers. I am making the change to 
initialize ClientMetricsManager only from KafkaServer.scala
   
   What do you think about `ClientMetricsReceiverPlugin` based on the usage in 
the tagged PR above?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-17 Thread via GitHub


junrao commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1397714746


##
clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java:
##
@@ -60,17 +62,47 @@ public PushTelemetryRequest(PushTelemetryRequestData data, 
short version) {
 
 @Override
 public PushTelemetryResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
-PushTelemetryResponseData responseData = new 
PushTelemetryResponseData()
-.setErrorCode(Errors.forException(e).code())
-.setThrottleTimeMs(throttleTimeMs);
-return new PushTelemetryResponse(responseData);
+return errorResponse(throttleTimeMs, Errors.forException(e));
 }
 
 @Override
 public PushTelemetryRequestData data() {
 return data;
 }
 
+public PushTelemetryResponse errorResponse(int throttleTimeMs, Errors 
errors) {
+PushTelemetryResponseData responseData = new 
PushTelemetryResponseData();
+responseData.setErrorCode(errors.code());
+/*
+ THROTTLING_QUOTA_EXCEEDED is thrown in telemetry APIs when the 
telemetry request
+ arrives prior the configured minimum interval between two consecutive 
telemetry requests.
+ In this case, the throttleTimeMs should not be included in the 
response to avoid the
+ muting of the client channel for all the other requests.
+*/
+if (Errors.THROTTLING_QUOTA_EXCEEDED != errors) {

Review Comment:
   We shouldn't set throttleTimeMs even for Errors.THROTTLING_QUOTA_EXCEEDED. 
throttleTimeMs can only be set when there is no error.



##
core/src/main/java/kafka/metrics/ClientMetricsReceiverPlugin.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.server.telemetry.ClientTelemetryReceiver;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Plugin to register client telemetry receivers and export metrics. This 
class is used by the Kafka
+ * server to export client metrics to the registered receivers.
+ */
+public class ClientMetricsReceiverPlugin {
+
+private static final ClientMetricsReceiverPlugin INSTANCE = new 
ClientMetricsReceiverPlugin();

Review Comment:
   Hmm, why is ClientMetricsManager needed in the controller? I thought only 
brokers can receive get/push telemetric requests.



##
core/src/test/java/kafka/metrics/ClientMetricsInstanceTest.java:
##
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.net.UnknownHostException;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientMetricsInstanceTest {
+
+private Uuid uuid;
+private ClientMetricsInstanceMetadata instanceMetadata;
+private ClientMetricsInstance clientInstance;
+
+@BeforeEach
+public void setUp() throws UnknownHostException {
+uuid = Uuid.randomUuid();
+instanceMetadata = new ClientMetricsInstanceMetadata(uuid,
+ClientMetricsTestUtils.requestContext());
+clientInstance = new 

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-16 Thread via GitHub


junrao commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1396501916


##
core/src/test/java/kafka/metrics/ClientMetricsInstanceTest.java:
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.net.UnknownHostException;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientMetricsInstanceTest {
+
+private Uuid uuid;
+private ClientMetricsInstanceMetadata instanceMetadata;
+private ClientMetricsInstance clientInstance;
+
+@BeforeEach
+public void setUp() throws UnknownHostException {
+uuid = Uuid.randomUuid();
+instanceMetadata = new ClientMetricsInstanceMetadata(uuid,
+ClientMetricsTestUtils.requestContext());

Review Comment:
   Yes, I just meant merging line 37 and 38.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-16 Thread via GitHub


junrao commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1396500493


##
clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java:
##
@@ -60,17 +62,39 @@ public PushTelemetryRequest(PushTelemetryRequestData data, 
short version) {
 
 @Override
 public PushTelemetryResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
-PushTelemetryResponseData responseData = new 
PushTelemetryResponseData()
-.setErrorCode(Errors.forException(e).code())
-.setThrottleTimeMs(throttleTimeMs);
-return new PushTelemetryResponse(responseData);
+return errorResponse(throttleTimeMs, Errors.forException(e));

Review Comment:
   Got it. Since getErrorResponse is an override of a base method, we can leave 
it as it is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-16 Thread via GitHub


apoorvmittal10 commented on PR #14699:
URL: https://github.com/apache/kafka/pull/14699#issuecomment-1815015828

   Thanks for reviewing @junrao, I have updated the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-16 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1396171173


##
core/src/test/java/kafka/metrics/ClientMetricsInstanceTest.java:
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.net.UnknownHostException;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientMetricsInstanceTest {
+
+private Uuid uuid;
+private ClientMetricsInstanceMetadata instanceMetadata;
+private ClientMetricsInstance clientInstance;
+
+@BeforeEach
+public void setUp() throws UnknownHostException {
+uuid = Uuid.randomUuid();
+instanceMetadata = new ClientMetricsInstanceMetadata(uuid,
+ClientMetricsTestUtils.requestContext());
+clientInstance = new ClientMetricsInstance(Uuid.randomUuid(), 
instanceMetadata, 0, 0,
+null, ClientMetricsConfigs.DEFAULT_INTERVAL_MS);
+}
+
+@Test
+public void testMaybeUpdateRequestEpochValid() {
+// First request should be accepted.
+
assertTrue(clientInstance.maybeUpdateGetRequestEpoch(System.currentTimeMillis()));
+
assertTrue(clientInstance.maybeUpdatePushRequestEpoch(System.currentTimeMillis()));
+}
+
+@Test
+public void testMaybeUpdateGetRequestAfterElapsedTimeValid() throws 
InterruptedException {
+ClientMetricsInstance clientInstance = new ClientMetricsInstance(uuid, 
instanceMetadata, 0, 0,
+null, 2);
+
assertTrue(clientInstance.maybeUpdateGetRequestEpoch(System.currentTimeMillis()));
+// sleep for 3 ms to ensure that the next request is accepted.
+Thread.sleep(3);
+// Second request should be accepted as time since last request is 
greater than the retry interval.

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-16 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1396170895


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -30,17 +68,366 @@ public class ClientMetricsManager implements Closeable {
 
 private static final Logger log = 
LoggerFactory.getLogger(ClientMetricsManager.class);
 private static final ClientMetricsManager INSTANCE = new 
ClientMetricsManager();
+private static final List SUPPORTED_COMPRESSION_TYPES = 
Collections.unmodifiableList(
+Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, 
CompressionType.GZIP.id, CompressionType.SNAPPY.id));
 
 public static ClientMetricsManager instance() {
 return INSTANCE;
 }
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+
+// The last subscription updated time is used to determine if the next 
telemetry request needs
+// to re-evaluate the subscription id as per changes subscriptions.
+private volatile long lastSubscriptionUpdateEpoch;
+
+// Visible for testing
+ClientMetricsManager() {
+subscriptionMap = new ConcurrentHashMap<>();
+clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+}
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+updateLastSubscriptionUpdateEpoch();
+}
+return;
+}
+
+ClientMetricsConfigs configs = new ClientMetricsConfigs(properties);
+updateClientSubscription(subscriptionName, configs);
+/*
+ Update last subscription updated time to current time to indicate 
that there is a change
+ in the subscription. This will be used to determine if the next 
telemetry request needs
+ to re-evaluate the subscription id as per changes subscriptions.
+*/
+updateLastSubscriptionUpdateEpoch();
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+long now = System.currentTimeMillis();
+Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+.filter(id -> !id.equals(Uuid.ZERO_UUID))
+.orElse(generateNewClientId());
+
+/*
+ Get the client instance from the cache or create a new one. If 
subscription has changed
+ since the last request, then the client instance will be 
re-evaluated. Validation of the
+ request will be done after the client instance is created. If client 
issued get telemetry
+ request prior to push interval, then the client should get a throttle 
error but if the
+ subscription has changed since the last request then the client 
should get the updated
+ subscription immediately.
+*/
+ClientMetricsInstance clientInstance = 
clientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the get request parameters for the client instance.
+validateGetRequest(request, clientInstance, now);
+} catch (ApiException exception) {
+return request.getErrorResponse(throttleMs, exception);
+}
+
+clientInstance.lastKnownError(Errors.NONE);
+return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+}
+
+public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+Uuid clientInstanceId = request.data().clientInstanceId();
+if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) {

Review Comment:
   I have moved the check to Uuid.RESERVED, done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure 

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-16 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1396168530


##
core/src/test/java/kafka/server/ClientMetricsManagerTest.java:
##
@@ -0,0 +1,921 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server;
+
+import kafka.metrics.ClientMetricsConfigs;
+import kafka.metrics.ClientMetricsInstance;
+import kafka.metrics.ClientMetricsTestUtils;
+import kafka.server.ClientMetricsManager.SubscriptionInfo;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryRequest.Builder;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientMetricsManagerTest {
+
+private ClientMetricsManager clientMetricsManager;
+
+@BeforeEach
+public void setUp() {
+clientMetricsManager = new ClientMetricsManager();
+}
+
+@Test
+public void testUpdateSubscription() {
+assertTrue(clientMetricsManager.subscriptions().isEmpty());
+
+long previousEpoch = 
clientMetricsManager.lastSubscriptionUpdateEpoch();
+clientMetricsManager.updateSubscription("sub-1", 
ClientMetricsTestUtils.defaultProperties());
+
+assertEquals(1, clientMetricsManager.subscriptions().size());
+assertNotNull(clientMetricsManager.subscriptionInfo("sub-1"));
+
+SubscriptionInfo subscriptionInfo = 
clientMetricsManager.subscriptionInfo("sub-1");
+Set metrics = subscriptionInfo.metrics();
+
+// Validate metrics.
+assertEquals(ClientMetricsTestUtils.DEFAULT_METRICS.split(",").length, 
metrics.size());
+
Arrays.stream(ClientMetricsTestUtils.DEFAULT_METRICS.split(",")).forEach(metric 
->
+assertTrue(metrics.contains(metric)));
+// Validate push interval.
+
assertEquals(ClientMetricsTestUtils.defaultProperties().getProperty(ClientMetricsConfigs.PUSH_INTERVAL_MS),
+String.valueOf(subscriptionInfo.intervalMs()));
+
+// Validate match patterns.
+
assertEquals(ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.size(),
+subscriptionInfo.matchPattern().size());
+ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.forEach(pattern 
-> {
+String[] split = pattern.split("=");
+assertTrue(subscriptionInfo.matchPattern().containsKey(split[0]));
+assertEquals(split[1], 
subscriptionInfo.matchPattern().get(split[0]).pattern());
+});
+assertTrue(clientMetricsManager.lastSubscriptionUpdateEpoch() > 
previousEpoch);
+}
+
+@Test
+public void testUpdateSubscriptionWithEmptyProperties() {
+assertTrue(clientMetricsManager.subscriptions().isEmpty());
+long previousEpoch = 
clientMetricsManager.lastSubscriptionUpdateEpoch();
+clientMetricsManager.updateSubscription("sub-1", new Properties());
+// No subscription should be added as the properties are empty.
+assertEquals(0, clientMetricsManager.subscriptions().size());
+

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-16 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1396168208


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -30,17 +68,366 @@ public class ClientMetricsManager implements Closeable {
 
 private static final Logger log = 
LoggerFactory.getLogger(ClientMetricsManager.class);
 private static final ClientMetricsManager INSTANCE = new 
ClientMetricsManager();
+private static final List SUPPORTED_COMPRESSION_TYPES = 
Collections.unmodifiableList(
+Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, 
CompressionType.GZIP.id, CompressionType.SNAPPY.id));
 
 public static ClientMetricsManager instance() {
 return INSTANCE;
 }
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+
+// The last subscription updated time is used to determine if the next 
telemetry request needs
+// to re-evaluate the subscription id as per changes subscriptions.
+private volatile long lastSubscriptionUpdateEpoch;
+
+// Visible for testing
+ClientMetricsManager() {
+subscriptionMap = new ConcurrentHashMap<>();
+clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+}
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+updateLastSubscriptionUpdateEpoch();
+}
+return;
+}
+
+ClientMetricsConfigs configs = new ClientMetricsConfigs(properties);
+updateClientSubscription(subscriptionName, configs);
+/*
+ Update last subscription updated time to current time to indicate 
that there is a change
+ in the subscription. This will be used to determine if the next 
telemetry request needs
+ to re-evaluate the subscription id as per changes subscriptions.
+*/
+updateLastSubscriptionUpdateEpoch();
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+long now = System.currentTimeMillis();
+Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+.filter(id -> !id.equals(Uuid.ZERO_UUID))
+.orElse(generateNewClientId());
+
+/*
+ Get the client instance from the cache or create a new one. If 
subscription has changed
+ since the last request, then the client instance will be 
re-evaluated. Validation of the
+ request will be done after the client instance is created. If client 
issued get telemetry
+ request prior to push interval, then the client should get a throttle 
error but if the
+ subscription has changed since the last request then the client 
should get the updated
+ subscription immediately.
+*/
+ClientMetricsInstance clientInstance = 
clientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the get request parameters for the client instance.
+validateGetRequest(request, clientInstance, now);
+} catch (ApiException exception) {
+return request.getErrorResponse(throttleMs, exception);
+}
+
+clientInstance.lastKnownError(Errors.NONE);
+return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+}
+
+public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+Uuid clientInstanceId = request.data().clientInstanceId();
+if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) {
+String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+clientInstanceId);
+return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));
+}
+
+long now = System.currentTimeMillis();
+ClientMetricsInstance clientInstance = 

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-16 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1396167813


##
core/src/test/java/kafka/server/ClientMetricsManagerTest.java:
##
@@ -0,0 +1,921 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server;
+
+import kafka.metrics.ClientMetricsConfigs;
+import kafka.metrics.ClientMetricsInstance;
+import kafka.metrics.ClientMetricsTestUtils;
+import kafka.server.ClientMetricsManager.SubscriptionInfo;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryRequest.Builder;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientMetricsManagerTest {
+
+private ClientMetricsManager clientMetricsManager;
+
+@BeforeEach
+public void setUp() {
+clientMetricsManager = new ClientMetricsManager();
+}
+
+@Test
+public void testUpdateSubscription() {
+assertTrue(clientMetricsManager.subscriptions().isEmpty());
+
+long previousEpoch = 
clientMetricsManager.lastSubscriptionUpdateEpoch();
+clientMetricsManager.updateSubscription("sub-1", 
ClientMetricsTestUtils.defaultProperties());
+
+assertEquals(1, clientMetricsManager.subscriptions().size());
+assertNotNull(clientMetricsManager.subscriptionInfo("sub-1"));
+
+SubscriptionInfo subscriptionInfo = 
clientMetricsManager.subscriptionInfo("sub-1");
+Set metrics = subscriptionInfo.metrics();
+
+// Validate metrics.
+assertEquals(ClientMetricsTestUtils.DEFAULT_METRICS.split(",").length, 
metrics.size());
+
Arrays.stream(ClientMetricsTestUtils.DEFAULT_METRICS.split(",")).forEach(metric 
->
+assertTrue(metrics.contains(metric)));
+// Validate push interval.
+
assertEquals(ClientMetricsTestUtils.defaultProperties().getProperty(ClientMetricsConfigs.PUSH_INTERVAL_MS),
+String.valueOf(subscriptionInfo.intervalMs()));
+
+// Validate match patterns.
+
assertEquals(ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.size(),
+subscriptionInfo.matchPattern().size());
+ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.forEach(pattern 
-> {
+String[] split = pattern.split("=");
+assertTrue(subscriptionInfo.matchPattern().containsKey(split[0]));
+assertEquals(split[1], 
subscriptionInfo.matchPattern().get(split[0]).pattern());
+});
+assertTrue(clientMetricsManager.lastSubscriptionUpdateEpoch() > 
previousEpoch);
+}
+
+@Test
+public void testUpdateSubscriptionWithEmptyProperties() {
+assertTrue(clientMetricsManager.subscriptions().isEmpty());
+long previousEpoch = 
clientMetricsManager.lastSubscriptionUpdateEpoch();
+clientMetricsManager.updateSubscription("sub-1", new Properties());
+// No subscription should be added as the properties are empty.
+assertEquals(0, clientMetricsManager.subscriptions().size());
+

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-16 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1396165846


##
core/src/test/java/kafka/server/ClientMetricsManagerTest.java:
##
@@ -0,0 +1,921 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server;
+
+import kafka.metrics.ClientMetricsConfigs;
+import kafka.metrics.ClientMetricsInstance;
+import kafka.metrics.ClientMetricsTestUtils;
+import kafka.server.ClientMetricsManager.SubscriptionInfo;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryRequest.Builder;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientMetricsManagerTest {
+
+private ClientMetricsManager clientMetricsManager;
+
+@BeforeEach
+public void setUp() {
+clientMetricsManager = new ClientMetricsManager();
+}
+
+@Test
+public void testUpdateSubscription() {
+assertTrue(clientMetricsManager.subscriptions().isEmpty());
+
+long previousEpoch = 
clientMetricsManager.lastSubscriptionUpdateEpoch();
+clientMetricsManager.updateSubscription("sub-1", 
ClientMetricsTestUtils.defaultProperties());
+
+assertEquals(1, clientMetricsManager.subscriptions().size());
+assertNotNull(clientMetricsManager.subscriptionInfo("sub-1"));
+
+SubscriptionInfo subscriptionInfo = 
clientMetricsManager.subscriptionInfo("sub-1");
+Set metrics = subscriptionInfo.metrics();
+
+// Validate metrics.
+assertEquals(ClientMetricsTestUtils.DEFAULT_METRICS.split(",").length, 
metrics.size());
+
Arrays.stream(ClientMetricsTestUtils.DEFAULT_METRICS.split(",")).forEach(metric 
->
+assertTrue(metrics.contains(metric)));
+// Validate push interval.
+
assertEquals(ClientMetricsTestUtils.defaultProperties().getProperty(ClientMetricsConfigs.PUSH_INTERVAL_MS),
+String.valueOf(subscriptionInfo.intervalMs()));
+
+// Validate match patterns.
+
assertEquals(ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.size(),
+subscriptionInfo.matchPattern().size());
+ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.forEach(pattern 
-> {
+String[] split = pattern.split("=");
+assertTrue(subscriptionInfo.matchPattern().containsKey(split[0]));
+assertEquals(split[1], 
subscriptionInfo.matchPattern().get(split[0]).pattern());
+});
+assertTrue(clientMetricsManager.lastSubscriptionUpdateEpoch() > 
previousEpoch);
+}
+
+@Test
+public void testUpdateSubscriptionWithEmptyProperties() {
+assertTrue(clientMetricsManager.subscriptions().isEmpty());
+long previousEpoch = 
clientMetricsManager.lastSubscriptionUpdateEpoch();
+clientMetricsManager.updateSubscription("sub-1", new Properties());
+// No subscription should be added as the properties are empty.
+assertEquals(0, clientMetricsManager.subscriptions().size());
+

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-16 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1396164589


##
core/src/test/java/kafka/metrics/ClientMetricsInstanceTest.java:
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.net.UnknownHostException;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientMetricsInstanceTest {
+
+private Uuid uuid;
+private ClientMetricsInstanceMetadata instanceMetadata;
+private ClientMetricsInstance clientInstance;
+
+@BeforeEach
+public void setUp() throws UnknownHostException {
+uuid = Uuid.randomUuid();
+instanceMetadata = new ClientMetricsInstanceMetadata(uuid,
+ClientMetricsTestUtils.requestContext());
+clientInstance = new ClientMetricsInstance(Uuid.randomUuid(), 
instanceMetadata, 0, 0,
+null, ClientMetricsConfigs.DEFAULT_INTERVAL_MS);
+}
+
+@Test
+public void testMaybeUpdateRequestEpochValid() {
+// First request should be accepted.
+
assertTrue(clientInstance.maybeUpdateGetRequestEpoch(System.currentTimeMillis()));
+
assertTrue(clientInstance.maybeUpdatePushRequestEpoch(System.currentTimeMillis()));
+}
+
+@Test
+public void testMaybeUpdateGetRequestAfterElapsedTimeValid() throws 
InterruptedException {
+ClientMetricsInstance clientInstance = new ClientMetricsInstance(uuid, 
instanceMetadata, 0, 0,
+null, 2);
+
assertTrue(clientInstance.maybeUpdateGetRequestEpoch(System.currentTimeMillis()));
+// sleep for 3 ms to ensure that the next request is accepted.
+Thread.sleep(3);
+// Second request should be accepted as time since last request is 
greater than the retry interval.
+
assertTrue(clientInstance.maybeUpdateGetRequestEpoch(System.currentTimeMillis()));
+}
+
+@Test
+public void testMaybeUpdateGetRequestWithImmediateRetryFail() {
+
assertTrue(clientInstance.maybeUpdateGetRequestEpoch(System.currentTimeMillis()));
+// Second request should be rejected as time since last request is 
less than the retry interval.
+
assertFalse(clientInstance.maybeUpdateGetRequestEpoch(System.currentTimeMillis()));
+}
+
+@Test
+public void testMaybeUpdatePushRequestAfterElapsedTimeValid() throws 
InterruptedException {

Review Comment:
   The methods independently check for updating getTelemetryTime and 
pushTelemetryTime respectively. Is the concern more about that we can merge 
them in single test? As I have removed the sleep now from methods then do you 
think it's fine to keep them separate?



##
core/src/test/java/kafka/metrics/ClientMetricsReceiverPluginTest.java:
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import kafka.metrics.ClientMetricsTestUtils.TestClientMetricsReceiver;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.junit.jupiter.api.Test;
+
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import 

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-16 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1396161493


##
core/src/test/java/kafka/metrics/ClientMetricsInstanceTest.java:
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.net.UnknownHostException;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientMetricsInstanceTest {
+
+private Uuid uuid;
+private ClientMetricsInstanceMetadata instanceMetadata;
+private ClientMetricsInstance clientInstance;
+
+@BeforeEach
+public void setUp() throws UnknownHostException {
+uuid = Uuid.randomUuid();
+instanceMetadata = new ClientMetricsInstanceMetadata(uuid,
+ClientMetricsTestUtils.requestContext());
+clientInstance = new ClientMetricsInstance(Uuid.randomUuid(), 
instanceMetadata, 0, 0,
+null, ClientMetricsConfigs.DEFAULT_INTERVAL_MS);
+}
+
+@Test
+public void testMaybeUpdateRequestEpochValid() {
+// First request should be accepted.
+
assertTrue(clientInstance.maybeUpdateGetRequestEpoch(System.currentTimeMillis()));
+
assertTrue(clientInstance.maybeUpdatePushRequestEpoch(System.currentTimeMillis()));
+}
+
+@Test
+public void testMaybeUpdateGetRequestAfterElapsedTimeValid() throws 
InterruptedException {
+ClientMetricsInstance clientInstance = new ClientMetricsInstance(uuid, 
instanceMetadata, 0, 0,
+null, 2);
+
assertTrue(clientInstance.maybeUpdateGetRequestEpoch(System.currentTimeMillis()));
+// sleep for 3 ms to ensure that the next request is accepted.
+Thread.sleep(3);

Review Comment:
   I have changed the code so we don't require sleep in tests, done. Also 
started using MockTime in ClientMetricsManager.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-16 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1396160383


##
core/src/test/java/kafka/metrics/ClientMetricsInstanceMetadataTest.java:
##
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.Test;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientMetricsInstanceMetadataTest {
+
+@Test
+public void testIsMatchValid() throws UnknownHostException {
+Uuid uuid = Uuid.randomUuid();
+ClientMetricsInstanceMetadata instanceMetadata = new 
ClientMetricsInstanceMetadata(uuid,
+ClientMetricsTestUtils.requestContext());
+// We consider empty/missing client matching patterns as valid
+assertTrue(instanceMetadata.isMatch(Collections.emptyMap()));
+assertTrue(instanceMetadata.isMatch(null));

Review Comment:
   Make sense, user cannot. I have removed this test condition and null check 
in the code as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-16 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1396159445


##
core/src/main/java/kafka/metrics/ClientMetricsConfigs.java:
##
@@ -80,6 +82,11 @@ public class ClientMetricsConfigs {
 public static final String CLIENT_SOURCE_ADDRESS = "client_source_address";
 public static final String CLIENT_SOURCE_PORT = "client_source_port";
 
+// Empty string in client-metrics resource configs indicates that all the 
metrics are subscribed.
+public static final String ALL_SUBSCRIBED_METRICS_CONFIG = "\"\"";

Review Comment:
   I have moved to '*' as the identifier, done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-16 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1396158662


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable {
 public static ClientMetricsManager instance() {
 return INSTANCE;
 }
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+
+// The last subscription updated time is used to determine if the next 
telemetry request needs
+// to re-evaluate the subscription id as per changes subscriptions.
+private long lastSubscriptionUpdateEpoch;
+
+// Visible for testing
+ClientMetricsManager() {
+subscriptionMap = new ConcurrentHashMap<>();
+clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+}
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+updateLastSubscriptionUpdateEpoch();
+}
+return;
+}
+
+ClientMetricsConfigs configs = new ClientMetricsConfigs(properties);
+updateClientSubscription(subscriptionName, configs);
+/*
+ Update last subscription updated time to current time to indicate 
that there is a change
+ in the subscription. This will be used to determine if the next 
telemetry request needs
+ to re-evaluate the subscription id as per changes subscriptions.
+*/
+updateLastSubscriptionUpdateEpoch();
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+long now = System.currentTimeMillis();
+Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+.filter(id -> !id.equals(Uuid.ZERO_UUID))
+.orElse(generateNewClientId());
+
+/*
+ Get the client instance from the cache or create a new one. If 
subscription has changed
+ since the last request, then the client instance will be 
re-evaluated. Validation of the
+ request will be done after the client instance is created. If client 
issued get telemetry
+ request prior to push interval, then the client should get a throttle 
error but if the
+ subscription has changed since the last request then the client 
should get the updated
+ subscription immediately.
+*/
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the get request parameters for the client instance.
+validateGetRequest(request, clientInstance);
+} catch (ApiException exception) {
+return request.getErrorResponse(throttleMs, exception);
+} finally {
+clientInstance.lastGetRequestEpoch(now);
+}
+
+clientInstance.lastKnownError(Errors.NONE);
+return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+}
+
+public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+Uuid clientInstanceId = request.data().clientInstanceId();
+if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) {
+String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+clientInstanceId);
+return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));

Review Comment:
   I have removed the throttle time THROTTLING_QUOTA_EXCEEDED as per the 
explanation in other comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-16 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1396158035


##
clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java:
##
@@ -71,6 +73,31 @@ public PushTelemetryRequestData data() {
 return data;
 }
 
+public PushTelemetryResponse createResponse(int throttleTimeMs, Errors 
errors) {
+PushTelemetryResponseData responseData = new 
PushTelemetryResponseData();
+responseData.setErrorCode(errors.code());
+responseData.setThrottleTimeMs(throttleTimeMs);

Review Comment:
   Thanks for the explanation @junrao, this is helpful. I have removed 
throttleTimeMs for the case when THROTTLING_QUOTA_EXCEEDED exception is thrown 
in telemetry APIs. Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-16 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1396154104


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable {
 public static ClientMetricsManager instance() {
 return INSTANCE;
 }
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+
+// The last subscription updated time is used to determine if the next 
telemetry request needs
+// to re-evaluate the subscription id as per changes subscriptions.
+private long lastSubscriptionUpdateEpoch;
+
+// Visible for testing
+ClientMetricsManager() {
+subscriptionMap = new ConcurrentHashMap<>();
+clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+}
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+updateLastSubscriptionUpdateEpoch();
+}
+return;
+}
+
+ClientMetricsConfigs configs = new ClientMetricsConfigs(properties);
+updateClientSubscription(subscriptionName, configs);
+/*
+ Update last subscription updated time to current time to indicate 
that there is a change
+ in the subscription. This will be used to determine if the next 
telemetry request needs
+ to re-evaluate the subscription id as per changes subscriptions.
+*/
+updateLastSubscriptionUpdateEpoch();
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+long now = System.currentTimeMillis();
+Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+.filter(id -> !id.equals(Uuid.ZERO_UUID))
+.orElse(generateNewClientId());
+
+/*
+ Get the client instance from the cache or create a new one. If 
subscription has changed
+ since the last request, then the client instance will be 
re-evaluated. Validation of the
+ request will be done after the client instance is created. If client 
issued get telemetry
+ request prior to push interval, then the client should get a throttle 
error but if the
+ subscription has changed since the last request then the client 
should get the updated
+ subscription immediately.
+*/
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the get request parameters for the client instance.
+validateGetRequest(request, clientInstance);
+} catch (ApiException exception) {
+return request.getErrorResponse(throttleMs, exception);
+} finally {
+clientInstance.lastGetRequestEpoch(now);
+}
+
+clientInstance.lastKnownError(Errors.NONE);
+return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+}
+
+public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+Uuid clientInstanceId = request.data().clientInstanceId();
+if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) {
+String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+clientInstanceId);
+return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));
+}
+
+long now = System.currentTimeMillis();
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the push request parameters for the client instance.
+validatePushRequest(request, telemetryMaxBytes, clientInstance);
+} catch (ApiException exception) {
+clientInstance.lastKnownError(Errors.forException(exception));

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-16 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1396148679


##
core/src/main/java/kafka/metrics/ClientMetricsReceiverPlugin.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.server.telemetry.ClientTelemetryReceiver;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Plugin to register client telemetry receivers and export metrics. This 
class is used by the Kafka
+ * server to export client metrics to the registered receivers.
+ */
+public class ClientMetricsReceiverPlugin {
+
+private static final ClientMetricsReceiverPlugin INSTANCE = new 
ClientMetricsReceiverPlugin();

Review Comment:
   Make sense, for `ClientMetricsManager` the class is needed to be in 
BrokerServer.scala and ControllerServer.scala as the client-metrics resource 
reload has to be managed at a singleton instance. Shall I add the changes in 
SharedServer.scala to share the same class?
   
   For ClientMetricsReceiverPlugin, here is the PR that uses same 
https://github.com/apache/kafka/pull/14767/files#diff-019dd06900a2f4714d626ffb071466765413970462f36a86e8a37c0ef9c0afb9,
 as the class is required while defining DynamicBrokerConfig hence I thought to 
make it singleton as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-16 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1396143360


##
clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java:
##
@@ -60,17 +62,39 @@ public PushTelemetryRequest(PushTelemetryRequestData data, 
short version) {
 
 @Override
 public PushTelemetryResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
-PushTelemetryResponseData responseData = new 
PushTelemetryResponseData()
-.setErrorCode(Errors.forException(e).code())
-.setThrottleTimeMs(throttleTimeMs);
-return new PushTelemetryResponse(responseData);
+return errorResponse(throttleTimeMs, Errors.forException(e));

Review Comment:
   I might have missed something here to understand, the method to override has 
to be `getErrorResponse` and I just created another method `errorResponse` 
which helps building the response. Is there anything I need to address here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-16 Thread via GitHub


AndrewJSchofield commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1395583845


##
core/src/test/java/kafka/metrics/ClientMetricsInstanceMetadataTest.java:
##
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.Test;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientMetricsInstanceMetadataTest {
+
+@Test
+public void testIsMatchValid() throws UnknownHostException {
+Uuid uuid = Uuid.randomUuid();
+ClientMetricsInstanceMetadata instanceMetadata = new 
ClientMetricsInstanceMetadata(uuid,
+ClientMetricsTestUtils.requestContext());
+// We consider empty/missing client matching patterns as valid
+assertTrue(instanceMetadata.isMatch(Collections.emptyMap()));
+assertTrue(instanceMetadata.isMatch(null));

Review Comment:
   Good question. `kafka-client-metrics.sh  --alter --name CM1 --match ""` 
deletes the map.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-15 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1395220263


##
core/src/main/java/kafka/metrics/ClientMetricsInstance.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Contains the metrics instance metadata and the state of the client instance.
+ */
+public class ClientMetricsInstance {
+
+private final Uuid clientInstanceId;
+private final ClientMetricsInstanceMetadata instanceMetadata;
+private final int subscriptionId;
+private final long subscriptionUpdateEpoch;
+private final Set metrics;
+private final int pushIntervalMs;
+
+private long lastGetRequestEpoch;
+private long lastPushRequestEpoch;
+private volatile boolean terminating;
+private volatile Errors lastKnownError;
+
+public ClientMetricsInstance(Uuid clientInstanceId, 
ClientMetricsInstanceMetadata instanceMetadata,
+int subscriptionId, long subscriptionUpdateEpoch, Set metrics, 
int pushIntervalMs) {
+this.clientInstanceId = Objects.requireNonNull(clientInstanceId);
+this.instanceMetadata = Objects.requireNonNull(instanceMetadata);
+this.subscriptionId = subscriptionId;
+this.subscriptionUpdateEpoch = subscriptionUpdateEpoch;
+this.metrics = metrics;
+this.terminating = false;
+this.pushIntervalMs = pushIntervalMs;
+this.lastKnownError = Errors.NONE;
+}
+
+public Uuid clientInstanceId() {
+return clientInstanceId;
+}
+
+public ClientMetricsInstanceMetadata instanceMetadata() {
+return instanceMetadata;
+}
+
+public int pushIntervalMs() {
+return pushIntervalMs;
+}
+
+public long subscriptionUpdateEpoch() {
+return subscriptionUpdateEpoch;
+}
+
+public int subscriptionId() {
+return subscriptionId;
+}
+
+public Set metrics() {
+return metrics;
+}
+
+public boolean terminating() {
+return terminating;

Review Comment:
   The reason why I didn't remove the instance from in-memory cache as the 
subsequent requests from terminated client should be rejected. Having said 
that, the terminated client should also not remain in-memory forever hence the 
terminated client should be removed from cache as per the eviction policy of 
cache i.e. `MAX(60*1000, PushIntervalMs * 3) milliseconds`. This shall be 
inherently handled by the cache improvement task where we have some time based 
eviction policy.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-15 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1395204343


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -30,17 +68,366 @@ public class ClientMetricsManager implements Closeable {
 
 private static final Logger log = 
LoggerFactory.getLogger(ClientMetricsManager.class);
 private static final ClientMetricsManager INSTANCE = new 
ClientMetricsManager();
+private static final List SUPPORTED_COMPRESSION_TYPES = 
Collections.unmodifiableList(
+Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, 
CompressionType.GZIP.id, CompressionType.SNAPPY.id));
 
 public static ClientMetricsManager instance() {
 return INSTANCE;
 }
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+
+// The last subscription updated time is used to determine if the next 
telemetry request needs
+// to re-evaluate the subscription id as per changes subscriptions.
+private volatile long lastSubscriptionUpdateEpoch;
+
+// Visible for testing
+ClientMetricsManager() {
+subscriptionMap = new ConcurrentHashMap<>();
+clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+}
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+updateLastSubscriptionUpdateEpoch();
+}
+return;
+}
+
+ClientMetricsConfigs configs = new ClientMetricsConfigs(properties);
+updateClientSubscription(subscriptionName, configs);
+/*
+ Update last subscription updated time to current time to indicate 
that there is a change
+ in the subscription. This will be used to determine if the next 
telemetry request needs
+ to re-evaluate the subscription id as per changes subscriptions.
+*/
+updateLastSubscriptionUpdateEpoch();
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+long now = System.currentTimeMillis();
+Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+.filter(id -> !id.equals(Uuid.ZERO_UUID))
+.orElse(generateNewClientId());
+
+/*
+ Get the client instance from the cache or create a new one. If 
subscription has changed
+ since the last request, then the client instance will be 
re-evaluated. Validation of the
+ request will be done after the client instance is created. If client 
issued get telemetry
+ request prior to push interval, then the client should get a throttle 
error but if the
+ subscription has changed since the last request then the client 
should get the updated
+ subscription immediately.
+*/
+ClientMetricsInstance clientInstance = 
clientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the get request parameters for the client instance.
+validateGetRequest(request, clientInstance, now);
+} catch (ApiException exception) {
+return request.getErrorResponse(throttleMs, exception);
+}
+
+clientInstance.lastKnownError(Errors.NONE);
+return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+}
+
+public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+Uuid clientInstanceId = request.data().clientInstanceId();
+if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) {
+String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+clientInstanceId);
+return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));
+}
+
+long now = System.currentTimeMillis();
+ClientMetricsInstance clientInstance = 

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-15 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1395171995


##
core/src/test/java/kafka/server/ClientMetricsManagerTest.java:
##
@@ -0,0 +1,921 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server;
+
+import kafka.metrics.ClientMetricsConfigs;
+import kafka.metrics.ClientMetricsInstance;
+import kafka.metrics.ClientMetricsTestUtils;
+import kafka.server.ClientMetricsManager.SubscriptionInfo;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryRequest.Builder;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientMetricsManagerTest {
+
+private ClientMetricsManager clientMetricsManager;
+
+@BeforeEach
+public void setUp() {
+clientMetricsManager = new ClientMetricsManager();
+}
+
+@Test
+public void testUpdateSubscription() {
+assertTrue(clientMetricsManager.subscriptions().isEmpty());
+
+long previousEpoch = 
clientMetricsManager.lastSubscriptionUpdateEpoch();
+clientMetricsManager.updateSubscription("sub-1", 
ClientMetricsTestUtils.defaultProperties());
+
+assertEquals(1, clientMetricsManager.subscriptions().size());
+assertNotNull(clientMetricsManager.subscriptionInfo("sub-1"));
+
+SubscriptionInfo subscriptionInfo = 
clientMetricsManager.subscriptionInfo("sub-1");
+Set metrics = subscriptionInfo.metrics();
+
+// Validate metrics.
+assertEquals(ClientMetricsTestUtils.DEFAULT_METRICS.split(",").length, 
metrics.size());
+
Arrays.stream(ClientMetricsTestUtils.DEFAULT_METRICS.split(",")).forEach(metric 
->
+assertTrue(metrics.contains(metric)));
+// Validate push interval.
+
assertEquals(ClientMetricsTestUtils.defaultProperties().getProperty(ClientMetricsConfigs.PUSH_INTERVAL_MS),
+String.valueOf(subscriptionInfo.intervalMs()));
+
+// Validate match patterns.
+
assertEquals(ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.size(),
+subscriptionInfo.matchPattern().size());
+ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.forEach(pattern 
-> {
+String[] split = pattern.split("=");
+assertTrue(subscriptionInfo.matchPattern().containsKey(split[0]));
+assertEquals(split[1], 
subscriptionInfo.matchPattern().get(split[0]).pattern());
+});
+assertTrue(clientMetricsManager.lastSubscriptionUpdateEpoch() > 
previousEpoch);
+}
+
+@Test
+public void testUpdateSubscriptionWithEmptyProperties() {
+assertTrue(clientMetricsManager.subscriptions().isEmpty());
+long previousEpoch = 
clientMetricsManager.lastSubscriptionUpdateEpoch();
+clientMetricsManager.updateSubscription("sub-1", new Properties());
+// No subscription should be added as the properties are empty.
+assertEquals(0, clientMetricsManager.subscriptions().size());
+

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-15 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1395166691


##
core/src/test/java/kafka/server/ClientMetricsManagerTest.java:
##
@@ -0,0 +1,921 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server;
+
+import kafka.metrics.ClientMetricsConfigs;
+import kafka.metrics.ClientMetricsInstance;
+import kafka.metrics.ClientMetricsTestUtils;
+import kafka.server.ClientMetricsManager.SubscriptionInfo;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryRequest.Builder;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientMetricsManagerTest {
+
+private ClientMetricsManager clientMetricsManager;
+
+@BeforeEach
+public void setUp() {
+clientMetricsManager = new ClientMetricsManager();
+}
+
+@Test
+public void testUpdateSubscription() {
+assertTrue(clientMetricsManager.subscriptions().isEmpty());
+
+long previousEpoch = 
clientMetricsManager.lastSubscriptionUpdateEpoch();
+clientMetricsManager.updateSubscription("sub-1", 
ClientMetricsTestUtils.defaultProperties());
+
+assertEquals(1, clientMetricsManager.subscriptions().size());
+assertNotNull(clientMetricsManager.subscriptionInfo("sub-1"));
+
+SubscriptionInfo subscriptionInfo = 
clientMetricsManager.subscriptionInfo("sub-1");
+Set metrics = subscriptionInfo.metrics();
+
+// Validate metrics.
+assertEquals(ClientMetricsTestUtils.DEFAULT_METRICS.split(",").length, 
metrics.size());
+
Arrays.stream(ClientMetricsTestUtils.DEFAULT_METRICS.split(",")).forEach(metric 
->
+assertTrue(metrics.contains(metric)));
+// Validate push interval.
+
assertEquals(ClientMetricsTestUtils.defaultProperties().getProperty(ClientMetricsConfigs.PUSH_INTERVAL_MS),
+String.valueOf(subscriptionInfo.intervalMs()));
+
+// Validate match patterns.
+
assertEquals(ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.size(),
+subscriptionInfo.matchPattern().size());
+ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.forEach(pattern 
-> {
+String[] split = pattern.split("=");
+assertTrue(subscriptionInfo.matchPattern().containsKey(split[0]));
+assertEquals(split[1], 
subscriptionInfo.matchPattern().get(split[0]).pattern());
+});
+assertTrue(clientMetricsManager.lastSubscriptionUpdateEpoch() > 
previousEpoch);
+}
+
+@Test
+public void testUpdateSubscriptionWithEmptyProperties() {
+assertTrue(clientMetricsManager.subscriptions().isEmpty());
+long previousEpoch = 
clientMetricsManager.lastSubscriptionUpdateEpoch();
+clientMetricsManager.updateSubscription("sub-1", new Properties());
+// No subscription should be added as the properties are empty.
+assertEquals(0, clientMetricsManager.subscriptions().size());
+

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-15 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1395156877


##
core/src/test/java/kafka/metrics/ClientMetricsInstanceTest.java:
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.net.UnknownHostException;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientMetricsInstanceTest {
+
+private Uuid uuid;
+private ClientMetricsInstanceMetadata instanceMetadata;
+private ClientMetricsInstance clientInstance;
+
+@BeforeEach
+public void setUp() throws UnknownHostException {
+uuid = Uuid.randomUuid();
+instanceMetadata = new ClientMetricsInstanceMetadata(uuid,
+ClientMetricsTestUtils.requestContext());

Review Comment:
   I have used `uuid` var later as well to check if `match` is successful hence 
kept it separate. But if it's for merging the line 37 and 38 then I did that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-15 Thread via GitHub


AndrewJSchofield commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1394744738


##
core/src/main/java/kafka/metrics/ClientMetricsConfigs.java:
##
@@ -80,6 +82,11 @@ public class ClientMetricsConfigs {
 public static final String CLIENT_SOURCE_ADDRESS = "client_source_address";
 public static final String CLIENT_SOURCE_PORT = "client_source_port";
 
+// Empty string in client-metrics resource configs indicates that all the 
metrics are subscribed.
+public static final String ALL_SUBSCRIBED_METRICS_CONFIG = "\"\"";

Review Comment:
   It would be possible to change the KIP to support a list containing just `*` 
to mean all metrics. The KIP today says a list containing just `""` means all 
metrics. That's a pretty simple change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-14 Thread via GitHub


junrao commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391737904


##
clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java:
##
@@ -60,17 +62,39 @@ public PushTelemetryRequest(PushTelemetryRequestData data, 
short version) {
 
 @Override
 public PushTelemetryResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
-PushTelemetryResponseData responseData = new 
PushTelemetryResponseData()
-.setErrorCode(Errors.forException(e).code())
-.setThrottleTimeMs(throttleTimeMs);
-return new PushTelemetryResponse(responseData);
+return errorResponse(throttleTimeMs, Errors.forException(e));

Review Comment:
   This is an existing issue, but `getErrorResponse` can just be 
`errorResponse`.



##
core/src/test/java/kafka/metrics/ClientMetricsInstanceTest.java:
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.net.UnknownHostException;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientMetricsInstanceTest {
+
+private Uuid uuid;
+private ClientMetricsInstanceMetadata instanceMetadata;
+private ClientMetricsInstance clientInstance;
+
+@BeforeEach
+public void setUp() throws UnknownHostException {
+uuid = Uuid.randomUuid();
+instanceMetadata = new ClientMetricsInstanceMetadata(uuid,
+ClientMetricsTestUtils.requestContext());

Review Comment:
   merge with previous line?



##
core/src/test/java/kafka/server/ClientMetricsManagerTest.java:
##
@@ -0,0 +1,921 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server;
+
+import kafka.metrics.ClientMetricsConfigs;
+import kafka.metrics.ClientMetricsInstance;
+import kafka.metrics.ClientMetricsTestUtils;
+import kafka.server.ClientMetricsManager.SubscriptionInfo;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryRequest.Builder;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static 

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on PR #14699:
URL: https://github.com/apache/kafka/pull/14699#issuecomment-1809013650

   > @apoorvmittal10 : Thanks for the PR. Made a pass of non-testing files. 
Left a few comments.
   
   Thanks a lot for the review @junrao . I have addressed the comments and have 
a question related to throttleTimeMs for errors in the comments. Please if you 
can re-review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391660088


##
core/src/main/java/kafka/metrics/ClientMetricsReceiverPlugin.java:
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.server.telemetry.ClientTelemetryReceiver;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Plugin to register client telemetry receivers and export metrics. This 
class is used by the Kafka
+ * server to export client metrics to the registered receivers.
+ */
+public class ClientMetricsReceiverPlugin {
+
+private static final List RECEIVERS = new 
ArrayList<>();

Review Comment:
   Done.



##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable {
 public static ClientMetricsManager instance() {
 return INSTANCE;
 }
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+
+// The last subscription updated time is used to determine if the next 
telemetry request needs
+// to re-evaluate the subscription id as per changes subscriptions.
+private long lastSubscriptionUpdateEpoch;
+
+// Visible for testing
+ClientMetricsManager() {
+subscriptionMap = new ConcurrentHashMap<>();
+clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+}
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+updateLastSubscriptionUpdateEpoch();
+}
+return;
+}
+
+ClientMetricsConfigs configs = new ClientMetricsConfigs(properties);
+updateClientSubscription(subscriptionName, configs);
+/*
+ Update last subscription updated time to current time to indicate 
that there is a change
+ in the subscription. This will be used to determine if the next 
telemetry request needs
+ to re-evaluate the subscription id as per changes subscriptions.
+*/
+updateLastSubscriptionUpdateEpoch();
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+long now = System.currentTimeMillis();
+Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+.filter(id -> !id.equals(Uuid.ZERO_UUID))
+.orElse(generateNewClientId());
+
+/*
+ Get the client instance from the cache or create a new one. If 
subscription has changed
+ since the last request, then the client instance will be 
re-evaluated. Validation of the
+ request will be done after the client instance is created. If client 
issued get telemetry
+ request prior to push interval, then the client should get a throttle 
error but if the
+ subscription has changed since the last request then the client 
should get the updated
+ subscription immediately.
+*/
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate 

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391659830


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable {
 public static ClientMetricsManager instance() {
 return INSTANCE;
 }
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+
+// The last subscription updated time is used to determine if the next 
telemetry request needs
+// to re-evaluate the subscription id as per changes subscriptions.
+private long lastSubscriptionUpdateEpoch;
+
+// Visible for testing
+ClientMetricsManager() {
+subscriptionMap = new ConcurrentHashMap<>();
+clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+}
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+updateLastSubscriptionUpdateEpoch();
+}
+return;
+}
+
+ClientMetricsConfigs configs = new ClientMetricsConfigs(properties);
+updateClientSubscription(subscriptionName, configs);
+/*
+ Update last subscription updated time to current time to indicate 
that there is a change
+ in the subscription. This will be used to determine if the next 
telemetry request needs
+ to re-evaluate the subscription id as per changes subscriptions.
+*/
+updateLastSubscriptionUpdateEpoch();
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+long now = System.currentTimeMillis();
+Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+.filter(id -> !id.equals(Uuid.ZERO_UUID))
+.orElse(generateNewClientId());
+
+/*
+ Get the client instance from the cache or create a new one. If 
subscription has changed
+ since the last request, then the client instance will be 
re-evaluated. Validation of the
+ request will be done after the client instance is created. If client 
issued get telemetry
+ request prior to push interval, then the client should get a throttle 
error but if the
+ subscription has changed since the last request then the client 
should get the updated
+ subscription immediately.
+*/
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the get request parameters for the client instance.
+validateGetRequest(request, clientInstance);
+} catch (ApiException exception) {
+return request.getErrorResponse(throttleMs, exception);
+} finally {
+clientInstance.lastGetRequestEpoch(now);
+}
+
+clientInstance.lastKnownError(Errors.NONE);
+return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+}
+
+public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+Uuid clientInstanceId = request.data().clientInstanceId();
+if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) {
+String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+clientInstanceId);
+return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));
+}
+
+long now = System.currentTimeMillis();
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the push request parameters for the client instance.
+validatePushRequest(request, telemetryMaxBytes, clientInstance);
+} catch (ApiException exception) {
+clientInstance.lastKnownError(Errors.forException(exception));

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391659127


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable {
 public static ClientMetricsManager instance() {
 return INSTANCE;
 }
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+
+// The last subscription updated time is used to determine if the next 
telemetry request needs
+// to re-evaluate the subscription id as per changes subscriptions.
+private long lastSubscriptionUpdateEpoch;
+
+// Visible for testing
+ClientMetricsManager() {
+subscriptionMap = new ConcurrentHashMap<>();
+clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+}
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+updateLastSubscriptionUpdateEpoch();
+}
+return;
+}
+
+ClientMetricsConfigs configs = new ClientMetricsConfigs(properties);
+updateClientSubscription(subscriptionName, configs);
+/*
+ Update last subscription updated time to current time to indicate 
that there is a change
+ in the subscription. This will be used to determine if the next 
telemetry request needs
+ to re-evaluate the subscription id as per changes subscriptions.
+*/
+updateLastSubscriptionUpdateEpoch();
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+long now = System.currentTimeMillis();
+Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+.filter(id -> !id.equals(Uuid.ZERO_UUID))
+.orElse(generateNewClientId());
+
+/*
+ Get the client instance from the cache or create a new one. If 
subscription has changed
+ since the last request, then the client instance will be 
re-evaluated. Validation of the
+ request will be done after the client instance is created. If client 
issued get telemetry
+ request prior to push interval, then the client should get a throttle 
error but if the
+ subscription has changed since the last request then the client 
should get the updated
+ subscription immediately.
+*/
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the get request parameters for the client instance.
+validateGetRequest(request, clientInstance);
+} catch (ApiException exception) {
+return request.getErrorResponse(throttleMs, exception);
+} finally {
+clientInstance.lastGetRequestEpoch(now);
+}
+
+clientInstance.lastKnownError(Errors.NONE);
+return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+}
+
+public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+Uuid clientInstanceId = request.data().clientInstanceId();
+if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) {
+String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+clientInstanceId);
+return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));
+}
+
+long now = System.currentTimeMillis();
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the push request parameters for the client instance.
+validatePushRequest(request, telemetryMaxBytes, clientInstance);
+} catch (ApiException exception) {
+clientInstance.lastKnownError(Errors.forException(exception));

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391658586


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable {
 public static ClientMetricsManager instance() {
 return INSTANCE;
 }
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+
+// The last subscription updated time is used to determine if the next 
telemetry request needs
+// to re-evaluate the subscription id as per changes subscriptions.
+private long lastSubscriptionUpdateEpoch;
+
+// Visible for testing
+ClientMetricsManager() {
+subscriptionMap = new ConcurrentHashMap<>();
+clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+}
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+updateLastSubscriptionUpdateEpoch();
+}
+return;
+}
+
+ClientMetricsConfigs configs = new ClientMetricsConfigs(properties);
+updateClientSubscription(subscriptionName, configs);
+/*
+ Update last subscription updated time to current time to indicate 
that there is a change
+ in the subscription. This will be used to determine if the next 
telemetry request needs
+ to re-evaluate the subscription id as per changes subscriptions.
+*/
+updateLastSubscriptionUpdateEpoch();
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+long now = System.currentTimeMillis();
+Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+.filter(id -> !id.equals(Uuid.ZERO_UUID))
+.orElse(generateNewClientId());
+
+/*
+ Get the client instance from the cache or create a new one. If 
subscription has changed
+ since the last request, then the client instance will be 
re-evaluated. Validation of the
+ request will be done after the client instance is created. If client 
issued get telemetry
+ request prior to push interval, then the client should get a throttle 
error but if the
+ subscription has changed since the last request then the client 
should get the updated
+ subscription immediately.
+*/
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the get request parameters for the client instance.
+validateGetRequest(request, clientInstance);
+} catch (ApiException exception) {
+return request.getErrorResponse(throttleMs, exception);
+} finally {
+clientInstance.lastGetRequestEpoch(now);
+}
+
+clientInstance.lastKnownError(Errors.NONE);
+return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+}
+
+public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+Uuid clientInstanceId = request.data().clientInstanceId();
+if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) {
+String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+clientInstanceId);
+return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));
+}
+
+long now = System.currentTimeMillis();
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the push request parameters for the client instance.
+validatePushRequest(request, telemetryMaxBytes, clientInstance);
+} catch (ApiException exception) {
+clientInstance.lastKnownError(Errors.forException(exception));

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391658438


##
core/src/main/java/kafka/metrics/ClientMetricsInstance.java:
##
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Contains the metrics instance metadata and the state of the client instance.
+ */
+public class ClientMetricsInstance {
+
+private final Uuid clientInstanceId;
+private final ClientMetricsInstanceMetadata instanceMetadata;
+private final int subscriptionId;
+private final long subscriptionUpdateEpoch;
+private final Set metrics;
+private final int pushIntervalMs;
+
+private boolean terminating;
+private long lastGetRequestEpoch;
+private long lastPushRequestEpoch;
+private Errors lastKnownError;

Review Comment:
   I have marked 2 of them volatile and other 2 are protected now with 
`synchronized`. I have added additional concurrency tests in to validate the 
behaviour that a single request can process when multiple are received for same 
client instance, others get rejected by throttling error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391656533


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable {
 public static ClientMetricsManager instance() {
 return INSTANCE;
 }
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+
+// The last subscription updated time is used to determine if the next 
telemetry request needs
+// to re-evaluate the subscription id as per changes subscriptions.
+private long lastSubscriptionUpdateEpoch;

Review Comment:
   Make sense, thanks. Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391656238


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable {
 public static ClientMetricsManager instance() {
 return INSTANCE;
 }
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+
+// The last subscription updated time is used to determine if the next 
telemetry request needs
+// to re-evaluate the subscription id as per changes subscriptions.
+private long lastSubscriptionUpdateEpoch;
+
+// Visible for testing
+ClientMetricsManager() {
+subscriptionMap = new ConcurrentHashMap<>();
+clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+}
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+updateLastSubscriptionUpdateEpoch();
+}
+return;
+}
+
+ClientMetricsConfigs configs = new ClientMetricsConfigs(properties);
+updateClientSubscription(subscriptionName, configs);
+/*
+ Update last subscription updated time to current time to indicate 
that there is a change
+ in the subscription. This will be used to determine if the next 
telemetry request needs
+ to re-evaluate the subscription id as per changes subscriptions.
+*/
+updateLastSubscriptionUpdateEpoch();
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+long now = System.currentTimeMillis();
+Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+.filter(id -> !id.equals(Uuid.ZERO_UUID))
+.orElse(generateNewClientId());
+
+/*
+ Get the client instance from the cache or create a new one. If 
subscription has changed
+ since the last request, then the client instance will be 
re-evaluated. Validation of the
+ request will be done after the client instance is created. If client 
issued get telemetry
+ request prior to push interval, then the client should get a throttle 
error but if the
+ subscription has changed since the last request then the client 
should get the updated
+ subscription immediately.
+*/
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the get request parameters for the client instance.
+validateGetRequest(request, clientInstance);
+} catch (ApiException exception) {
+return request.getErrorResponse(throttleMs, exception);
+} finally {
+clientInstance.lastGetRequestEpoch(now);
+}
+
+clientInstance.lastKnownError(Errors.NONE);
+return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+}
+
+public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+Uuid clientInstanceId = request.data().clientInstanceId();
+if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) {
+String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+clientInstanceId);
+return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));
+}
+
+long now = System.currentTimeMillis();
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the push request parameters for the client instance.
+validatePushRequest(request, telemetryMaxBytes, clientInstance);
+} catch (ApiException exception) {
+clientInstance.lastKnownError(Errors.forException(exception));

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391654569


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -16,12 +16,48 @@
  */
 package kafka.server;
 
+import java.util.Collections;

Review Comment:
   The reason I kept this at `kafka.server` package as I see all managers (in 
scala) processing API calls from KafkaApis.scala resides `kafka.server` 
package. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391652781


##
clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java:
##
@@ -71,6 +73,31 @@ public PushTelemetryRequestData data() {
 return data;
 }
 
+public PushTelemetryResponse createResponse(int throttleTimeMs, Errors 
errors) {
+PushTelemetryResponseData responseData = new 
PushTelemetryResponseData();
+responseData.setErrorCode(errors.code());
+responseData.setThrottleTimeMs(throttleTimeMs);
+return new PushTelemetryResponse(responseData);
+}
+
+public String getMetricsContentType() {

Review Comment:
   Missed these, thanks. Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391651232


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable {
 public static ClientMetricsManager instance() {
 return INSTANCE;
 }
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+
+// The last subscription updated time is used to determine if the next 
telemetry request needs
+// to re-evaluate the subscription id as per changes subscriptions.
+private long lastSubscriptionUpdateEpoch;
+
+// Visible for testing
+ClientMetricsManager() {
+subscriptionMap = new ConcurrentHashMap<>();
+clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+}
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+updateLastSubscriptionUpdateEpoch();
+}
+return;
+}
+
+ClientMetricsConfigs configs = new ClientMetricsConfigs(properties);
+updateClientSubscription(subscriptionName, configs);
+/*
+ Update last subscription updated time to current time to indicate 
that there is a change
+ in the subscription. This will be used to determine if the next 
telemetry request needs
+ to re-evaluate the subscription id as per changes subscriptions.
+*/
+updateLastSubscriptionUpdateEpoch();
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+long now = System.currentTimeMillis();
+Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+.filter(id -> !id.equals(Uuid.ZERO_UUID))
+.orElse(generateNewClientId());
+
+/*
+ Get the client instance from the cache or create a new one. If 
subscription has changed
+ since the last request, then the client instance will be 
re-evaluated. Validation of the
+ request will be done after the client instance is created. If client 
issued get telemetry
+ request prior to push interval, then the client should get a throttle 
error but if the
+ subscription has changed since the last request then the client 
should get the updated
+ subscription immediately.
+*/
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the get request parameters for the client instance.
+validateGetRequest(request, clientInstance);
+} catch (ApiException exception) {
+return request.getErrorResponse(throttleMs, exception);
+} finally {
+clientInstance.lastGetRequestEpoch(now);
+}
+
+clientInstance.lastKnownError(Errors.NONE);
+return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+}
+
+public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+Uuid clientInstanceId = request.data().clientInstanceId();
+if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) {
+String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+clientInstanceId);
+return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));
+}
+
+long now = System.currentTimeMillis();
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the push request parameters for the client instance.
+validatePushRequest(request, telemetryMaxBytes, clientInstance);
+} catch (ApiException exception) {
+clientInstance.lastKnownError(Errors.forException(exception));

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391650483


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable {
 public static ClientMetricsManager instance() {
 return INSTANCE;
 }
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+
+// The last subscription updated time is used to determine if the next 
telemetry request needs
+// to re-evaluate the subscription id as per changes subscriptions.
+private long lastSubscriptionUpdateEpoch;
+
+// Visible for testing
+ClientMetricsManager() {
+subscriptionMap = new ConcurrentHashMap<>();
+clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+}
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+updateLastSubscriptionUpdateEpoch();
+}
+return;
+}
+
+ClientMetricsConfigs configs = new ClientMetricsConfigs(properties);
+updateClientSubscription(subscriptionName, configs);
+/*
+ Update last subscription updated time to current time to indicate 
that there is a change
+ in the subscription. This will be used to determine if the next 
telemetry request needs
+ to re-evaluate the subscription id as per changes subscriptions.
+*/
+updateLastSubscriptionUpdateEpoch();
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+long now = System.currentTimeMillis();
+Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+.filter(id -> !id.equals(Uuid.ZERO_UUID))
+.orElse(generateNewClientId());
+
+/*
+ Get the client instance from the cache or create a new one. If 
subscription has changed
+ since the last request, then the client instance will be 
re-evaluated. Validation of the
+ request will be done after the client instance is created. If client 
issued get telemetry
+ request prior to push interval, then the client should get a throttle 
error but if the
+ subscription has changed since the last request then the client 
should get the updated
+ subscription immediately.
+*/
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the get request parameters for the client instance.
+validateGetRequest(request, clientInstance);
+} catch (ApiException exception) {
+return request.getErrorResponse(throttleMs, exception);
+} finally {
+clientInstance.lastGetRequestEpoch(now);
+}
+
+clientInstance.lastKnownError(Errors.NONE);
+return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+}
+
+public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+Uuid clientInstanceId = request.data().clientInstanceId();
+if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) {
+String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+clientInstanceId);
+return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));

Review Comment:
   I have asked the same question in other comment but shouldn't we set that as 
API call goes through `sendMaybeThrottle` in KafkaApis which throttles the 
request?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact 

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391648051


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server;
+
+import java.util.Collections;
+import java.util.regex.Pattern;
+import kafka.metrics.ClientMetricsConfigs;
+import kafka.metrics.ClientMetricsInstance;
+import kafka.metrics.ClientMetricsInstanceMetadata;
+import kafka.metrics.ClientMetricsReceiverPlugin;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.TelemetryTooLargeException;
+import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
+import org.apache.kafka.common.errors.UnknownSubscriptionIdException;
+import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.requests.RequestContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.zip.CRC32;
+
+/**
+ * Handles client telemetry metrics requests/responses, subscriptions and 
instance information.
+ */
+public class ClientMetricsManager implements Closeable {
+
+private static final Logger log = 
LoggerFactory.getLogger(ClientMetricsManager.class);
+private static final ClientMetricsManager INSTANCE = new 
ClientMetricsManager();
+
+public static ClientMetricsManager instance() {
+return INSTANCE;
+}
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;

Review Comment:
   > Do we need to make this an LRU cache? If a client is terminated or idle, 
we should remove them from the cache
   
   I have started with LRU cache and planning to improve this with cache which 
timebounds the connection. The KIP says: `client instance specific state is 
maintained in broker memory up to MAX(60*1000, PushIntervalMs * 3) 
milliseconds`. I ll add improvement on the cache to respect that: 
https://issues.apache.org/jira/browse/KAFKA-15813



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391640685


##
core/src/main/java/kafka/metrics/ClientMetricsInstanceMetadata.java:
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import java.util.regex.Pattern;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.requests.RequestContext;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Information from the client's metadata is gathered from the client's 
request.
+ */
+public class ClientMetricsInstanceMetadata {
+
+private final Map attributesMap;
+
+public ClientMetricsInstanceMetadata(Uuid clientInstanceId, RequestContext 
requestContext) {
+Objects.requireNonNull(clientInstanceId);
+Objects.requireNonNull(requestContext);
+
+attributesMap = new HashMap<>();
+
+attributesMap.put(ClientMetricsConfigs.CLIENT_INSTANCE_ID, 
clientInstanceId.toString());
+attributesMap.put(ClientMetricsConfigs.CLIENT_ID, 
requestContext.clientId());
+attributesMap.put(ClientMetricsConfigs.CLIENT_SOFTWARE_NAME, 
requestContext.clientInformation != null ?
+requestContext.clientInformation.softwareName() : null);
+attributesMap.put(ClientMetricsConfigs.CLIENT_SOFTWARE_VERSION, 
requestContext.clientInformation != null ?
+requestContext.clientInformation.softwareVersion() : null);
+attributesMap.put(ClientMetricsConfigs.CLIENT_SOURCE_ADDRESS, 
requestContext.clientAddress != null ?
+requestContext.clientAddress.getHostAddress() : null);
+// KIP-714 mentions client source port should be the client 
connection's source port from the
+// broker's point of view. But the broker does not have this 
information rather the port could be
+// the broker's port where the client connection is established. We 
might want to consider removing
+// the client source port from the KIP or use broker port if that can 
be helpful.
+// TODO: fix port

Review Comment:
   Thanks a lot @junrao this is helpful. I will make the changes in subsequent 
PR to address this. I have created following jira for same: 
https://issues.apache.org/jira/browse/KAFKA-15811



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391639061


##
core/src/main/java/kafka/metrics/ClientMetricsConfigs.java:
##
@@ -80,6 +82,11 @@ public class ClientMetricsConfigs {
 public static final String CLIENT_SOURCE_ADDRESS = "client_source_address";
 public static final String CLIENT_SOURCE_PORT = "client_source_port";
 
+// Empty string in client-metrics resource configs indicates that all the 
metrics are subscribed.
+public static final String ALL_SUBSCRIBED_METRICS_CONFIG = "\"\"";

Review Comment:
   The KIP mentions that the response in subscriptions should be just empty 
string i.e. `""`. But to create `client-metrics` resource the ConfigDef parses 
`""` as no data hence to specify empty string through `kafka-configs.sh` we 
need to pass them as empty string enclosed in string i.e. `"\"\""`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391636101


##
clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java:
##
@@ -71,6 +73,31 @@ public PushTelemetryRequestData data() {
 return data;
 }
 
+public PushTelemetryResponse createResponse(int throttleTimeMs, Errors 
errors) {
+PushTelemetryResponseData responseData = new 
PushTelemetryResponseData();
+responseData.setErrorCode(errors.code());
+responseData.setThrottleTimeMs(throttleTimeMs);
+return new PushTelemetryResponse(responseData);
+}
+
+public String getMetricsContentType() {
+// Future versions of PushTelemetryRequest and 
GetTelemetrySubscriptionsRequest may include a content-type
+// field to allow for updated OTLP format versions (or additional 
formats), but this field is currently not
+// included since only one format is specified in the current proposal 
of the kip-714
+return OTLP_CONTENT_TYPE;
+}
+
+public ByteBuffer getMetricsData() {
+CompressionType cType = 
CompressionType.forId(this.data.compressionType());
+return (cType == CompressionType.NONE) ?
+ByteBuffer.wrap(this.data.metrics()) : 
decompressMetricsData(cType, this.data.metrics());
+}
+
+private static ByteBuffer decompressMetricsData(CompressionType 
compressionType, byte[] metrics) {
+// TODO: Add support for decompression of metrics data

Review Comment:
   Yes, I have created jira in parent KIP-714 task to address this: 
https://issues.apache.org/jira/browse/KAFKA-15807. I am planning to get 
end-to-end metrics flow without compression first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391634085


##
clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java:
##
@@ -71,6 +73,31 @@ public PushTelemetryRequestData data() {
 return data;
 }
 
+public PushTelemetryResponse createResponse(int throttleTimeMs, Errors 
errors) {
+PushTelemetryResponseData responseData = new 
PushTelemetryResponseData();
+responseData.setErrorCode(errors.code());
+responseData.setThrottleTimeMs(throttleTimeMs);

Review Comment:
   > Could we redirect getErrorResponse to here and rename it to errorResponse?
   
   Done
   
   > If error code is not ThrottlingQuotaExceededException, we should ignore 
throttleTimeMs.
   
   It might be naive but sorry I didn't understand as why throttleTimeMs should 
not be passed in response for other exceptions. Isn't all requests goes through 
common throttling code where requests might be throttled for sometime based on 
the throughput?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-13 Thread via GitHub


junrao commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391496243


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server;
+
+import java.util.Collections;
+import java.util.regex.Pattern;
+import kafka.metrics.ClientMetricsConfigs;
+import kafka.metrics.ClientMetricsInstance;
+import kafka.metrics.ClientMetricsInstanceMetadata;
+import kafka.metrics.ClientMetricsReceiverPlugin;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.TelemetryTooLargeException;
+import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
+import org.apache.kafka.common.errors.UnknownSubscriptionIdException;
+import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.requests.RequestContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.zip.CRC32;
+
+/**
+ * Handles client telemetry metrics requests/responses, subscriptions and 
instance information.
+ */
+public class ClientMetricsManager implements Closeable {
+
+private static final Logger log = 
LoggerFactory.getLogger(ClientMetricsManager.class);
+private static final ClientMetricsManager INSTANCE = new 
ClientMetricsManager();
+
+public static ClientMetricsManager instance() {
+return INSTANCE;
+}
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;

Review Comment:
   The challenge is that the connections can vary depending on the type of 
broker host. Larger instance typically can accommodate more connections. Do we 
need to make this an LRU cache? If a client is terminated or idle, we should 
remove them from the cache. Otherwise, we probably should just rely on the 
existing `max.connections `to control the client connections?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-13 Thread via GitHub


junrao commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391490591


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable {
 public static ClientMetricsManager instance() {
 return INSTANCE;
 }
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+
+// The last subscription updated time is used to determine if the next 
telemetry request needs
+// to re-evaluate the subscription id as per changes subscriptions.
+private long lastSubscriptionUpdateEpoch;
+
+// Visible for testing
+ClientMetricsManager() {
+subscriptionMap = new ConcurrentHashMap<>();
+clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+}
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+updateLastSubscriptionUpdateEpoch();
+}
+return;
+}
+
+ClientMetricsConfigs configs = new ClientMetricsConfigs(properties);
+updateClientSubscription(subscriptionName, configs);
+/*
+ Update last subscription updated time to current time to indicate 
that there is a change
+ in the subscription. This will be used to determine if the next 
telemetry request needs
+ to re-evaluate the subscription id as per changes subscriptions.
+*/
+updateLastSubscriptionUpdateEpoch();
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+long now = System.currentTimeMillis();
+Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+.filter(id -> !id.equals(Uuid.ZERO_UUID))
+.orElse(generateNewClientId());
+
+/*
+ Get the client instance from the cache or create a new one. If 
subscription has changed
+ since the last request, then the client instance will be 
re-evaluated. Validation of the
+ request will be done after the client instance is created. If client 
issued get telemetry
+ request prior to push interval, then the client should get a throttle 
error but if the
+ subscription has changed since the last request then the client 
should get the updated
+ subscription immediately.
+*/
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the get request parameters for the client instance.
+validateGetRequest(request, clientInstance);
+} catch (ApiException exception) {
+return request.getErrorResponse(throttleMs, exception);
+} finally {
+clientInstance.lastGetRequestEpoch(now);
+}
+
+clientInstance.lastKnownError(Errors.NONE);
+return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+}
+
+public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+Uuid clientInstanceId = request.data().clientInstanceId();
+if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) {
+String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+clientInstanceId);
+return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));
+}
+
+long now = System.currentTimeMillis();
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the push request parameters for the client instance.
+validatePushRequest(request, telemetryMaxBytes, clientInstance);
+} catch (ApiException exception) {
+clientInstance.lastKnownError(Errors.forException(exception));
+

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-10 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1389684994


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server;
+
+import java.util.Collections;
+import java.util.regex.Pattern;
+import kafka.metrics.ClientMetricsConfigs;
+import kafka.metrics.ClientMetricsInstance;
+import kafka.metrics.ClientMetricsInstanceMetadata;
+import kafka.metrics.ClientMetricsReceiverPlugin;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.TelemetryTooLargeException;
+import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
+import org.apache.kafka.common.errors.UnknownSubscriptionIdException;
+import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.requests.RequestContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.zip.CRC32;
+
+/**
+ * Handles client telemetry metrics requests/responses, subscriptions and 
instance information.
+ */
+public class ClientMetricsManager implements Closeable {
+
+private static final Logger log = 
LoggerFactory.getLogger(ClientMetricsManager.class);
+private static final ClientMetricsManager INSTANCE = new 
ClientMetricsManager();
+
+public static ClientMetricsManager instance() {
+return INSTANCE;
+}
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;

Review Comment:
   Then should it be in low 100K i.e. 2^17 = 131072? I am not sure what the 
right config should be here, I know the cloud providers like MSK typically 
provides 3000 active connections per broker where this limit is higher in case 
of Confluent but what typically are the active number of client connecting to 
broker should be. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-10 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1389719566


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable {
 public static ClientMetricsManager instance() {
 return INSTANCE;
 }
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+
+// The last subscription updated time is used to determine if the next 
telemetry request needs
+// to re-evaluate the subscription id as per changes subscriptions.
+private long lastSubscriptionUpdateEpoch;
+
+// Visible for testing
+ClientMetricsManager() {
+subscriptionMap = new ConcurrentHashMap<>();
+clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+}
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+updateLastSubscriptionUpdateEpoch();
+}
+return;
+}
+
+ClientMetricsConfigs configs = new ClientMetricsConfigs(properties);
+updateClientSubscription(subscriptionName, configs);
+/*
+ Update last subscription updated time to current time to indicate 
that there is a change
+ in the subscription. This will be used to determine if the next 
telemetry request needs
+ to re-evaluate the subscription id as per changes subscriptions.
+*/
+updateLastSubscriptionUpdateEpoch();
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+long now = System.currentTimeMillis();
+Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+.filter(id -> !id.equals(Uuid.ZERO_UUID))
+.orElse(generateNewClientId());
+
+/*
+ Get the client instance from the cache or create a new one. If 
subscription has changed
+ since the last request, then the client instance will be 
re-evaluated. Validation of the
+ request will be done after the client instance is created. If client 
issued get telemetry
+ request prior to push interval, then the client should get a throttle 
error but if the
+ subscription has changed since the last request then the client 
should get the updated
+ subscription immediately.
+*/
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the get request parameters for the client instance.
+validateGetRequest(request, clientInstance);
+} catch (ApiException exception) {
+return request.getErrorResponse(throttleMs, exception);
+} finally {
+clientInstance.lastGetRequestEpoch(now);
+}
+
+clientInstance.lastKnownError(Errors.NONE);
+return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+}
+
+public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+Uuid clientInstanceId = request.data().clientInstanceId();
+if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) {
+String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+clientInstanceId);
+return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));
+}
+
+long now = System.currentTimeMillis();
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the push request parameters for the client instance.
+validatePushRequest(request, telemetryMaxBytes, clientInstance);
+} catch (ApiException exception) {
+clientInstance.lastKnownError(Errors.forException(exception));

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-10 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1389709078


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable {
 public static ClientMetricsManager instance() {
 return INSTANCE;
 }
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+
+// The last subscription updated time is used to determine if the next 
telemetry request needs
+// to re-evaluate the subscription id as per changes subscriptions.
+private long lastSubscriptionUpdateEpoch;
+
+// Visible for testing
+ClientMetricsManager() {
+subscriptionMap = new ConcurrentHashMap<>();
+clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+}
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+updateLastSubscriptionUpdateEpoch();
+}
+return;
+}
+
+ClientMetricsConfigs configs = new ClientMetricsConfigs(properties);
+updateClientSubscription(subscriptionName, configs);
+/*
+ Update last subscription updated time to current time to indicate 
that there is a change
+ in the subscription. This will be used to determine if the next 
telemetry request needs
+ to re-evaluate the subscription id as per changes subscriptions.
+*/
+updateLastSubscriptionUpdateEpoch();
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+long now = System.currentTimeMillis();
+Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+.filter(id -> !id.equals(Uuid.ZERO_UUID))
+.orElse(generateNewClientId());
+
+/*
+ Get the client instance from the cache or create a new one. If 
subscription has changed
+ since the last request, then the client instance will be 
re-evaluated. Validation of the
+ request will be done after the client instance is created. If client 
issued get telemetry
+ request prior to push interval, then the client should get a throttle 
error but if the
+ subscription has changed since the last request then the client 
should get the updated
+ subscription immediately.
+*/
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the get request parameters for the client instance.
+validateGetRequest(request, clientInstance);
+} catch (ApiException exception) {
+return request.getErrorResponse(throttleMs, exception);
+} finally {
+clientInstance.lastGetRequestEpoch(now);
+}
+
+clientInstance.lastKnownError(Errors.NONE);
+return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+}
+
+public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+Uuid clientInstanceId = request.data().clientInstanceId();
+if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) {
+String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+clientInstanceId);
+return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));
+}
+
+long now = System.currentTimeMillis();
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the push request parameters for the client instance.
+validatePushRequest(request, telemetryMaxBytes, clientInstance);
+} catch (ApiException exception) {
+clientInstance.lastKnownError(Errors.forException(exception));

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-10 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1389684994


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server;
+
+import java.util.Collections;
+import java.util.regex.Pattern;
+import kafka.metrics.ClientMetricsConfigs;
+import kafka.metrics.ClientMetricsInstance;
+import kafka.metrics.ClientMetricsInstanceMetadata;
+import kafka.metrics.ClientMetricsReceiverPlugin;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.TelemetryTooLargeException;
+import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
+import org.apache.kafka.common.errors.UnknownSubscriptionIdException;
+import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.requests.RequestContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.zip.CRC32;
+
+/**
+ * Handles client telemetry metrics requests/responses, subscriptions and 
instance information.
+ */
+public class ClientMetricsManager implements Closeable {
+
+private static final Logger log = 
LoggerFactory.getLogger(ClientMetricsManager.class);
+private static final ClientMetricsManager INSTANCE = new 
ClientMetricsManager();
+
+public static ClientMetricsManager instance() {
+return INSTANCE;
+}
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;

Review Comment:
   The should it be in low 100K i.e. 2^17 = 131072?



##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server;
+
+import java.util.Collections;
+import java.util.regex.Pattern;
+import kafka.metrics.ClientMetricsConfigs;
+import kafka.metrics.ClientMetricsInstance;
+import kafka.metrics.ClientMetricsInstanceMetadata;
+import kafka.metrics.ClientMetricsReceiverPlugin;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.common.errors.ApiException;
+import 

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-09 Thread via GitHub


junrao commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1387273811


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -16,12 +16,48 @@
  */
 package kafka.server;
 
+import java.util.Collections;

Review Comment:
   Should this class be in the same package as other client metric related 
classes like `DefaultClientTelemetryPayload`?



##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable {
 public static ClientMetricsManager instance() {
 return INSTANCE;
 }
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+
+// The last subscription updated time is used to determine if the next 
telemetry request needs
+// to re-evaluate the subscription id as per changes subscriptions.
+private long lastSubscriptionUpdateEpoch;
+
+// Visible for testing
+ClientMetricsManager() {
+subscriptionMap = new ConcurrentHashMap<>();
+clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+}
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+updateLastSubscriptionUpdateEpoch();
+}
+return;
+}
+
+ClientMetricsConfigs configs = new ClientMetricsConfigs(properties);
+updateClientSubscription(subscriptionName, configs);
+/*
+ Update last subscription updated time to current time to indicate 
that there is a change
+ in the subscription. This will be used to determine if the next 
telemetry request needs
+ to re-evaluate the subscription id as per changes subscriptions.
+*/
+updateLastSubscriptionUpdateEpoch();
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+long now = System.currentTimeMillis();
+Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+.filter(id -> !id.equals(Uuid.ZERO_UUID))
+.orElse(generateNewClientId());
+
+/*
+ Get the client instance from the cache or create a new one. If 
subscription has changed
+ since the last request, then the client instance will be 
re-evaluated. Validation of the
+ request will be done after the client instance is created. If client 
issued get telemetry
+ request prior to push interval, then the client should get a throttle 
error but if the
+ subscription has changed since the last request then the client 
should get the updated
+ subscription immediately.
+*/
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the get request parameters for the client instance.
+validateGetRequest(request, clientInstance);
+} catch (ApiException exception) {
+return request.getErrorResponse(throttleMs, exception);
+} finally {
+clientInstance.lastGetRequestEpoch(now);
+}
+
+clientInstance.lastKnownError(Errors.NONE);
+return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+}
+
+public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+Uuid clientInstanceId = request.data().clientInstanceId();
+if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) {
+String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+clientInstanceId);
+return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));
+}
+
+long now = System.currentTimeMillis();
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, 

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-07 Thread via GitHub


apoorvmittal10 commented on PR #14699:
URL: https://github.com/apache/kafka/pull/14699#issuecomment-1800097217

   @junrao @hachikuji @AndrewJSchofield @mjsax Please if I can get feedback on 
the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-03 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1382258656


##
core/src/main/java/kafka/metrics/ClientMetricsConfigs.java:
##
@@ -143,46 +152,43 @@ private static void validateProperties(Properties 
properties) {
 /**
  * Parses the client matching patterns and builds a map with entries that 
has
  * (PatternName, PatternValue) as the entries.
- *  Ex: "VERSION=1.2.3" would be converted to a map entry of (Version, 
1.2.3)
+ * Ex: "VERSION=1.2.3" would be converted to a map entry of (Version, 
1.2.3)
  * 
- *  NOTES:
- *  Client match pattern splits the input into two parts separated by 
first occurrence of the character '='
+ * NOTES:
+ * Client match pattern splits the input into two parts separated by first 
occurrence of the character '='
  *
- *  @param patterns List of client matching pattern strings
+ * @param patterns List of client matching pattern strings
  * @return map of client matching pattern entries
  */
-public static Map parseMatchingPatterns(List 
patterns) {
-Map patternsMap = new HashMap<>();
-if (patterns != null) {
-patterns.forEach(pattern -> {
-String[] nameValuePair = pattern.split("=");
-if (nameValuePair.length != 2) {
-throw new InvalidConfigurationException("Illegal client 
matching pattern: " + pattern);
-}
-
-String param = nameValuePair[0].trim();
-String patternValue = nameValuePair[1].trim();
-if (isValidParam(param) && isValidRegExPattern(patternValue)) {
-patternsMap.put(param, patternValue);
-} else {
-throw new InvalidConfigurationException("Illegal client 
matching pattern: " + pattern);
-}
-});
+public static Map parseMatchingPatterns(List 
patterns) {

Review Comment:
   Moved from  to  to avoid reparsing of 
patterns.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-03 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1382255993


##
core/src/main/java/kafka/metrics/ClientMetricsConfigs.java:
##
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+/**
+ * Client metric configuration related parameters and the supporting methods 
like validation, etc. are
+ * defined in this class.
+ * 
+ * {
+ * 
+ *name: Name supplied by CLI during the creation of the client metric 
subscription.
+ *metrics: List of metric prefixes
+ *intervalMs: A positive integer value >=0  tells the client that how 
often a client can push the metrics
+ *match: List of client matching patterns, that are used by broker to 
match the client instance
+ *   with the subscription.
+ * 
+ * }
+ * 
+ * At present, CLI can pass the following parameters in request to 
add/delete/update the client metrics
+ * subscription:
+ * 
+ * "name" is a unique name for the subscription. This is used to 
identify the subscription in
+ *  the broker. Ex: "METRICS-SUB"
+ * "metrics" value should be comma separated metrics list. A prefix 
match on the requested metrics
+ *  is performed in clients to determine subscribed metrics. An empty 
list means no metrics subscribed.
+ *  A list containing just an empty string means all metrics 
subscribed.
+ *  Ex: 
"org.apache.kafka.producer.partition.queue.,org.apache.kafka.producer.partition.latency"
+ *
+ * "interval.ms" should be between 100 and 360 (1 hour). This is 
the interval at which the client
+ *  should push the metrics to the broker.
+ *
+ * "match" is a comma separated list of client match patterns, in case 
if there is no matching
+ *  pattern specified then broker considers that as all match which 
means the associated metrics
+ *  applies to all the clients. Ex: "client_software_name = Java, 
client_software_version = 11.1.*"
+ *  which means all Java clients with any sub versions of 11.1 will be 
matched i.e. 11.1.1, 11.1.2 etc.
+ * 
+ * For more information please look at kip-714:
+ * 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Clientmetricsconfiguration
+ */
+public class ClientMetricsConfigs extends AbstractConfig {

Review Comment:
   Resolved conflicts after merging the upstream.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-03 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1382109647


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server;
+
+import java.util.Collections;
+import java.util.regex.Pattern;
+import kafka.metrics.ClientMetricsConfigs;
+import kafka.metrics.ClientMetricsInstance;
+import kafka.metrics.ClientMetricsInstanceMetadata;
+import kafka.metrics.ClientMetricsReceiverPlugin;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.TelemetryTooLargeException;
+import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
+import org.apache.kafka.common.errors.UnknownSubscriptionIdException;
+import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.requests.RequestContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.zip.CRC32;
+
+/**
+ * Handles client telemetry metrics requests/responses, subscriptions and 
instance information.
+ */
+public class ClientMetricsManager implements Closeable {
+
+private static final Logger log = 
LoggerFactory.getLogger(ClientMetricsManager.class);
+private static final ClientMetricsManager INSTANCE = new 
ClientMetricsManager();
+
+public static ClientMetricsManager instance() {
+return INSTANCE;
+}
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;

Review Comment:
   The KIP-714 says: `This client instance specific state is maintained in 
broker memory up to MAX(60*1000, PushIntervalMs * 3) milliseconds and is used 
to enforce the push interval rate-limiting. There is no persistence of client 
instance metrics state across broker restarts or between brokers.`
   
   I have started with LRUCache with oldest entry eviction but will implement 
something similar to `Selector.IdleExpiryManager` which cleansup old 
connections.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-03 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1382109087


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server;
+
+import java.util.Collections;
+import java.util.regex.Pattern;
+import kafka.metrics.ClientMetricsConfigs;
+import kafka.metrics.ClientMetricsInstance;
+import kafka.metrics.ClientMetricsInstanceMetadata;
+import kafka.metrics.ClientMetricsReceiverPlugin;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.TelemetryTooLargeException;
+import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
+import org.apache.kafka.common.errors.UnknownSubscriptionIdException;
+import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.requests.RequestContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.zip.CRC32;
+
+/**
+ * Handles client telemetry metrics requests/responses, subscriptions and 
instance information.
+ */
+public class ClientMetricsManager implements Closeable {
+
+private static final Logger log = 
LoggerFactory.getLogger(ClientMetricsManager.class);
+private static final ClientMetricsManager INSTANCE = new 
ClientMetricsManager();
+
+public static ClientMetricsManager instance() {
+return INSTANCE;
+}
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;

Review Comment:
   The cache should hold the information of all connected client hence it 
should support the number of clients that can connect to single broker. Kept it 
high enough for now but would require suggestions for best approach. 
@AndrewJSchofield mentioned that it might be the `max.connections` broker 
config but the configs uppoer bound is  Int.MAX_VALUE hence we need to think 
about better upper bound here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-03 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1382107961


##
core/src/main/java/kafka/metrics/ClientMetricsConfigs.java:
##
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+/**
+ * Client metric configuration related parameters and the supporting methods 
like validation, etc. are
+ * defined in this class.
+ * 
+ * {
+ * 
+ *name: Name supplied by CLI during the creation of the client metric 
subscription.
+ *metrics: List of metric prefixes
+ *intervalMs: A positive integer value >=0  tells the client that how 
often a client can push the metrics
+ *match: List of client matching patterns, that are used by broker to 
match the client instance
+ *   with the subscription.
+ * 
+ * }
+ * 
+ * At present, CLI can pass the following parameters in request to 
add/delete/update the client metrics
+ * subscription:
+ * 
+ * "name" is a unique name for the subscription. This is used to 
identify the subscription in
+ *  the broker. Ex: "METRICS-SUB"
+ * "metrics" value should be comma separated metrics list. A prefix 
match on the requested metrics
+ *  is performed in clients to determine subscribed metrics. An empty 
list means no metrics subscribed.
+ *  A list containing just an empty string means all metrics 
subscribed.
+ *  Ex: 
"org.apache.kafka.producer.partition.queue.,org.apache.kafka.producer.partition.latency"
+ *
+ * "interval.ms" should be between 100 and 360 (1 hour). This is 
the interval at which the client
+ *  should push the metrics to the broker.
+ *
+ * "match" is a comma separated list of client match patterns, in case 
if there is no matching
+ *  pattern specified then broker considers that as all match which 
means the associated metrics
+ *  applies to all the clients. Ex: "client_software_name = Java, 
client_software_version = 11.1.*"
+ *  which means all Java clients with any sub versions of 11.1 will be 
matched i.e. 11.1.1, 11.1.2 etc.
+ * 
+ * For more information please look at kip-714:
+ * 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Clientmetricsconfiguration
+ */
+public class ClientMetricsConfigs extends AbstractConfig {

Review Comment:
   This class is major overlap from PR where it was introduced: 
https://github.com/apache/kafka/pull/14621



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org