Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-04-08 Thread via GitHub


apoorvmittal10 commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1555905789


##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -493,4 +520,124 @@ public void run() {
 }
 }
 }
+
+// Visible for testing
+final class ClientMetricsStats {
+
+private static final String GROUP_NAME = "ClientMetrics";
+
+// Visible for testing
+static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";

Review Comment:
   @junrao thanks for the details. I have opened the PR to address same: 
https://github.com/apache/kafka/pull/15680
   cc: @AndrewJSchofield 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-02-22 Thread via GitHub


junrao commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1500047932


##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -493,4 +520,124 @@ public void run() {
 }
 }
 }
+
+// Visible for testing
+final class ClientMetricsStats {
+
+private static final String GROUP_NAME = "ClientMetrics";
+
+// Visible for testing
+static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";

Review Comment:
   @apoorvmittal10 : On the server side, we have been using both Yammer metrics 
(org.apache.kafka.server.metrics.KafkaMetricsGroup) and Kafka metrics 
(org.apache.kafka.common.metrics.Metrics). The convention is that for the 
former, we use camel case. For the latter, we use hyphen. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-02-22 Thread via GitHub


apoorvmittal10 commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1500028907


##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -493,4 +520,124 @@ public void run() {
 }
 }
 }
+
+// Visible for testing
+final class ClientMetricsStats {
+
+private static final String GROUP_NAME = "ClientMetrics";
+
+// Visible for testing
+static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";

Review Comment:
   @junrao Thanks for bringing this up. I did look into the naming for metrics 
and found it's kind of mixed in Kafka as of now. I have attached below 2 
screenshots from what I see in JMX from `kafka.server` metrics itself. I myself 
was thinking that if there is some guidelines for the new metrics we follow.
   
   https://github.com/apache/kafka/assets/2861565/8435d2ff-a4cc-40e0-93c2-e06ee317fb4f;>
   https://github.com/apache/kafka/assets/2861565/0363ff09-0dc1-48d6-80de-c112a4da3ba1;>
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-02-21 Thread via GitHub


junrao commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1498439741


##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -493,4 +520,124 @@ public void run() {
 }
 }
 }
+
+// Visible for testing
+final class ClientMetricsStats {
+
+private static final String GROUP_NAME = "ClientMetrics";
+
+// Visible for testing
+static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";

Review Comment:
   So far, the naming convention for Kafka metrics is to use hyphen. So, 
ClientMetricsInstanceCount should be client-metrics-instance-count. Ditto 
below. Since the code hasn't been released yet. Could we make the change and 
update the KIP?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-30 Thread via GitHub


apoorvmittal10 closed pull request #15251: KAFKA-16186: Broker metrics for 
client telemetry (KIP-714)
URL: https://github.com/apache/kafka/pull/15251


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-29 Thread via GitHub


apoorvmittal10 commented on PR #15251:
URL: https://github.com/apache/kafka/pull/15251#issuecomment-1915703872

   > @apoorvmittal10 : Thanks for the updated PR. A few more comments.
   
   Thanks for the review and suggestions. I have addressed the comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-29 Thread via GitHub


apoorvmittal10 commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1470310097


##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -493,4 +519,123 @@ public void run() {
 }
 }
 }
+
+// Visible for testing
+final class ClientMetricsStats {
+
+private static final String GROUP_NAME = "ClientMetrics";
+
+// Visible for testing
+static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";
+static final String UNKNOWN_SUBSCRIPTION_REQUEST = 
"ClientMetricsUnknownSubscriptionRequest";
+static final String THROTTLE = "ClientMetricsThrottle";
+static final String PLUGIN_EXPORT = "ClientMetricsPluginExport";
+static final String PLUGIN_ERROR = "ClientMetricsPluginError";
+static final String PLUGIN_EXPORT_TIME = 
"ClientMetricsPluginExportTime";
+
+// Names of sensors that are registered through client instances.
+private final Set sensorsName = ConcurrentHashMap.newKeySet();
+// List of metric names which are not specific to a client instance. 
Do not require thread
+// safe structure as it will be populated only in constructor.
+private final List registeredMetricNames = new 
ArrayList<>();
+
+private final Set instanceMetrics = 
Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST,
+THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, 
PLUGIN_EXPORT_TIME).collect(Collectors.toSet());
+
+ClientMetricsStats() {
+Measurable instanceCount = (config, now) -> 
clientInstanceCache.size();
+MetricName instanceCountMetric = 
metrics.metricName(INSTANCE_COUNT, GROUP_NAME,
+"The current number of client metric instances being managed 
by the broker");
+metrics.addMetric(instanceCountMetric, instanceCount);
+registeredMetricNames.add(instanceCountMetric);
+}
+
+public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) {
+// If one sensor of the metrics has been registered for the client 
instance,
+// then all other sensors should have been registered; and vice 
versa.
+if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != 
null) {
+return;
+}
+
+Map tags = 
Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, 
clientInstanceId.toString());
+
+Sensor unknownSubscriptionRequestCountSensor = metrics.sensor(
+ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST + "-" + 
clientInstanceId);
+unknownSubscriptionRequestCountSensor.add(createMeter(metrics, new 
WindowedCount(),
+ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST, tags));
+sensorsName.add(unknownSubscriptionRequestCountSensor.name());
+
+Sensor throttleCount = metrics.sensor(ClientMetricsStats.THROTTLE 
+ "-" + clientInstanceId);
+throttleCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.THROTTLE, tags));
+sensorsName.add(throttleCount.name());
+
+Sensor pluginExportCount = 
metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT + "-" + clientInstanceId);
+pluginExportCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.PLUGIN_EXPORT, tags));
+sensorsName.add(pluginExportCount.name());
+
+Sensor pluginErrorCount = 
metrics.sensor(ClientMetricsStats.PLUGIN_ERROR + "-" + clientInstanceId);
+pluginErrorCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.PLUGIN_ERROR, tags));
+sensorsName.add(pluginErrorCount.name());
+
+Sensor pluginExportTime = 
metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT_TIME + "-" + clientInstanceId);
+
pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + 
"Avg",
+ClientMetricsStats.GROUP_NAME, "Average time broker spent in 
invoking plugin exportMetrics call", tags), new Avg());
+
pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + 
"Max",
+ClientMetricsStats.GROUP_NAME, "Maximum time broker spent in 
invoking plugin exportMetrics call", tags), new Max());
+sensorsName.add(pluginExportTime.name());
+}
+
+public void recordUnknownSubscriptionCount(Uuid clientInstanceId) {
+record(UNKNOWN_SUBSCRIPTION_REQUEST, clientInstanceId);
+}
+
+public void recordThrottleCount(Uuid clientInstanceId) {
+record(THROTTLE, clientInstanceId);
+}
+
+public void recordPluginExport(Uuid clientInstanceId, long timeMs) {
+record(PLUGIN_EXPORT, clientInstanceId);
+record(PLUGIN_EXPORT_TIME, clientInstanceId, timeMs);

Review Comment:
   Valid suggestion, thanks for that. I didn't see that. I have made the change 

Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-29 Thread via GitHub


apoorvmittal10 commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1470279663


##
server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java:
##
@@ -299,8 +332,8 @@ public void 
testGetTelemetrySameClientImmediateRetryAfterPushFail() throws Unkno
 // Create new client metrics manager which simulates a new server as 
it will not have any
 // last request information but request should succeed as subscription 
id should match
 // the one with new client instance.
-
-ClientMetricsManager newClientMetricsManager = new 
ClientMetricsManager(clientMetricsReceiverPlugin, 100, time);
+kafkaMetrics = new Metrics();

Review Comment:
   Agree, done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-29 Thread via GitHub


apoorvmittal10 commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1470278382


##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -288,6 +307,9 @@ private ClientMetricsInstance 
createClientInstanceAndUpdateCache(Uuid clientInst
 ClientMetricsInstanceMetadata instanceMetadata) {
 
 ClientMetricsInstance clientInstance = 
createClientInstance(clientInstanceId, instanceMetadata);
+// Maybe add client metrics, if metrics not already added. Metrics 
might be already added
+// if the client instance was evicted from the cache because of size 
limit.

Review Comment:
   The problem is that the eviction on size happens inside common LRUCache 
class where the code from this manager cannot be instrumented. I do not see any 
custom override that can be provided to LRUCache class. 
   
   Having said that, there won't be any hanging additional instance metrics 
sensors when eviction happens on size as anyways time-based eviction will 
remove such instance metrics. Also, if the respective instance reports metrics 
prior time-based eviction, then the metrics numbers for that instance will be 
correctly reported as well. Hence, I do think we should not evict metrics 
sensors on size-based eviction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-29 Thread via GitHub


apoorvmittal10 commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1470269365


##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -493,4 +519,123 @@ public void run() {
 }
 }
 }
+
+// Visible for testing
+final class ClientMetricsStats {
+
+private static final String GROUP_NAME = "ClientMetrics";
+
+// Visible for testing
+static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";
+static final String UNKNOWN_SUBSCRIPTION_REQUEST = 
"ClientMetricsUnknownSubscriptionRequest";
+static final String THROTTLE = "ClientMetricsThrottle";
+static final String PLUGIN_EXPORT = "ClientMetricsPluginExport";
+static final String PLUGIN_ERROR = "ClientMetricsPluginError";
+static final String PLUGIN_EXPORT_TIME = 
"ClientMetricsPluginExportTime";
+
+// Names of sensors that are registered through client instances.
+private final Set sensorsName = ConcurrentHashMap.newKeySet();
+// List of metric names which are not specific to a client instance. 
Do not require thread
+// safe structure as it will be populated only in constructor.
+private final List registeredMetricNames = new 
ArrayList<>();
+
+private final Set instanceMetrics = 
Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST,
+THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, 
PLUGIN_EXPORT_TIME).collect(Collectors.toSet());
+
+ClientMetricsStats() {
+Measurable instanceCount = (config, now) -> 
clientInstanceCache.size();
+MetricName instanceCountMetric = 
metrics.metricName(INSTANCE_COUNT, GROUP_NAME,
+"The current number of client metric instances being managed 
by the broker");
+metrics.addMetric(instanceCountMetric, instanceCount);
+registeredMetricNames.add(instanceCountMetric);
+}
+
+public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) {
+// If one sensor of the metrics has been registered for the client 
instance,
+// then all other sensors should have been registered; and vice 
versa.
+if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != 
null) {
+return;
+}
+
+Map tags = 
Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, 
clientInstanceId.toString());
+
+Sensor unknownSubscriptionRequestCountSensor = metrics.sensor(

Review Comment:
   I think you are right, I have removed the tag and moved the metric as global 
along instance count metric. Resolving the comment and would update the 
description of PR with change from the KIP.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-29 Thread via GitHub


junrao commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1470043472


##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -288,6 +307,9 @@ private ClientMetricsInstance 
createClientInstanceAndUpdateCache(Uuid clientInst
 ClientMetricsInstanceMetadata instanceMetadata) {
 
 ClientMetricsInstance clientInstance = 
createClientInstance(clientInstanceId, instanceMetadata);
+// Maybe add client metrics, if metrics not already added. Metrics 
might be already added
+// if the client instance was evicted from the cache because of size 
limit.

Review Comment:
   Hmm, if we evict a client instance from LRU cache, should we remove the 
corresponding metrics?



##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -493,4 +519,123 @@ public void run() {
 }
 }
 }
+
+// Visible for testing
+final class ClientMetricsStats {
+
+private static final String GROUP_NAME = "ClientMetrics";
+
+// Visible for testing
+static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";
+static final String UNKNOWN_SUBSCRIPTION_REQUEST = 
"ClientMetricsUnknownSubscriptionRequest";
+static final String THROTTLE = "ClientMetricsThrottle";
+static final String PLUGIN_EXPORT = "ClientMetricsPluginExport";
+static final String PLUGIN_ERROR = "ClientMetricsPluginError";
+static final String PLUGIN_EXPORT_TIME = 
"ClientMetricsPluginExportTime";
+
+// Names of sensors that are registered through client instances.
+private final Set sensorsName = ConcurrentHashMap.newKeySet();
+// List of metric names which are not specific to a client instance. 
Do not require thread
+// safe structure as it will be populated only in constructor.
+private final List registeredMetricNames = new 
ArrayList<>();
+
+private final Set instanceMetrics = 
Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST,
+THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, 
PLUGIN_EXPORT_TIME).collect(Collectors.toSet());
+
+ClientMetricsStats() {
+Measurable instanceCount = (config, now) -> 
clientInstanceCache.size();
+MetricName instanceCountMetric = 
metrics.metricName(INSTANCE_COUNT, GROUP_NAME,
+"The current number of client metric instances being managed 
by the broker");
+metrics.addMetric(instanceCountMetric, instanceCount);
+registeredMetricNames.add(instanceCountMetric);
+}
+
+public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) {
+// If one sensor of the metrics has been registered for the client 
instance,
+// then all other sensors should have been registered; and vice 
versa.
+if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != 
null) {
+return;
+}
+
+Map tags = 
Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, 
clientInstanceId.toString());
+
+Sensor unknownSubscriptionRequestCountSensor = metrics.sensor(
+ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST + "-" + 
clientInstanceId);
+unknownSubscriptionRequestCountSensor.add(createMeter(metrics, new 
WindowedCount(),
+ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST, tags));
+sensorsName.add(unknownSubscriptionRequestCountSensor.name());
+
+Sensor throttleCount = metrics.sensor(ClientMetricsStats.THROTTLE 
+ "-" + clientInstanceId);
+throttleCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.THROTTLE, tags));
+sensorsName.add(throttleCount.name());
+
+Sensor pluginExportCount = 
metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT + "-" + clientInstanceId);
+pluginExportCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.PLUGIN_EXPORT, tags));
+sensorsName.add(pluginExportCount.name());
+
+Sensor pluginErrorCount = 
metrics.sensor(ClientMetricsStats.PLUGIN_ERROR + "-" + clientInstanceId);
+pluginErrorCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.PLUGIN_ERROR, tags));
+sensorsName.add(pluginErrorCount.name());
+
+Sensor pluginExportTime = 
metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT_TIME + "-" + clientInstanceId);
+
pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + 
"Avg",
+ClientMetricsStats.GROUP_NAME, "Average time broker spent in 
invoking plugin exportMetrics call", tags), new Avg());
+
pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + 
"Max",
+ClientMetricsStats.GROUP_NAME, "Maximum time broker spent in 
invoking plugin exportMetrics call", tags), new 

Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-29 Thread via GitHub


AndrewJSchofield commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r146343


##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -493,4 +519,123 @@ public void run() {
 }
 }
 }
+
+// Visible for testing
+final class ClientMetricsStats {
+
+private static final String GROUP_NAME = "ClientMetrics";
+
+// Visible for testing
+static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";
+static final String UNKNOWN_SUBSCRIPTION_REQUEST = 
"ClientMetricsUnknownSubscriptionRequest";
+static final String THROTTLE = "ClientMetricsThrottle";
+static final String PLUGIN_EXPORT = "ClientMetricsPluginExport";
+static final String PLUGIN_ERROR = "ClientMetricsPluginError";
+static final String PLUGIN_EXPORT_TIME = 
"ClientMetricsPluginExportTime";
+
+// Names of sensors that are registered through client instances.
+private final Set sensorsName = ConcurrentHashMap.newKeySet();
+// List of metric names which are not specific to a client instance. 
Do not require thread
+// safe structure as it will be populated only in constructor.
+private final List registeredMetricNames = new 
ArrayList<>();
+
+private final Set instanceMetrics = 
Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST,
+THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, 
PLUGIN_EXPORT_TIME).collect(Collectors.toSet());
+
+ClientMetricsStats() {
+Measurable instanceCount = (config, now) -> 
clientInstanceCache.size();
+MetricName instanceCountMetric = 
metrics.metricName(INSTANCE_COUNT, GROUP_NAME,
+"The current number of client metric instances being managed 
by the broker");
+metrics.addMetric(instanceCountMetric, instanceCount);
+registeredMetricNames.add(instanceCountMetric);
+}
+
+public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) {
+// If one sensor of the metrics has been registered for the client 
instance,
+// then all other sensors should have been registered; and vice 
versa.
+if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != 
null) {
+return;
+}
+
+Map tags = 
Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, 
clientInstanceId.toString());
+
+Sensor unknownSubscriptionRequestCountSensor = metrics.sensor(

Review Comment:
   I don't think it's a good idea to tag with the client instance id. Imagine a 
situation in which an unbounded number of client instance IDs are erroneously 
being used. I don't think we want to build up knowledge of each of them 
individually, rather to count how many delinquent instances there are.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-29 Thread via GitHub


apoorvmittal10 commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1469854621


##
server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java:
##
@@ -299,8 +332,8 @@ public void 
testGetTelemetrySameClientImmediateRetryAfterPushFail() throws Unkno
 // Create new client metrics manager which simulates a new server as 
it will not have any
 // last request information but request should succeed as subscription 
id should match
 // the one with new client instance.
-
-ClientMetricsManager newClientMetricsManager = new 
ClientMetricsManager(clientMetricsReceiverPlugin, 100, time);
+kafkaMetrics = new Metrics();

Review Comment:
   The reason of construction is that I do not want to associate earlier 
instance of kafkaMetrics to new client manager instance else the metrics will 
collide. Hence created the new one when another instance of client manger is 
created.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-29 Thread via GitHub


apoorvmittal10 commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1469851552


##
server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java:
##
@@ -1023,5 +1177,18 @@ public void 
testCacheExpirationTaskCancelledOnInstanceUpdate() throws UnknownHos
 assertNotNull(instance);
 assertNotNull(instance.expirationTimerTask());
 assertEquals(1, clientMetricsManager.expirationTimer().size());
+// Metrics size should remain same on instance update.
+assertEquals(12, kafkaMetrics.metrics().size());
+assertEquals((double) 1, 
getMetric(ClientMetricsManager.ClientMetricsStats.INSTANCE_COUNT).metricValue());
+}
+
+private KafkaMetric getMetric(String name) throws Exception {
+Optional> metric = 
kafkaMetrics.metrics().entrySet().stream()

Review Comment:
   The map has `MetricName` as the key, rather constructing `MetricName` again 
in tests with tags, I iterated on the map itself. Similar to how we do 
SelectorTest.java.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-29 Thread via GitHub


apoorvmittal10 commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1469849574


##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -493,4 +519,123 @@ public void run() {
 }
 }
 }
+
+// Visible for testing
+final class ClientMetricsStats {
+
+private static final String GROUP_NAME = "ClientMetrics";
+
+// Visible for testing
+static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";
+static final String UNKNOWN_SUBSCRIPTION_REQUEST = 
"ClientMetricsUnknownSubscriptionRequest";
+static final String THROTTLE = "ClientMetricsThrottle";
+static final String PLUGIN_EXPORT = "ClientMetricsPluginExport";
+static final String PLUGIN_ERROR = "ClientMetricsPluginError";
+static final String PLUGIN_EXPORT_TIME = 
"ClientMetricsPluginExportTime";
+
+// Names of sensors that are registered through client instances.
+private final Set sensorsName = ConcurrentHashMap.newKeySet();
+// List of metric names which are not specific to a client instance. 
Do not require thread
+// safe structure as it will be populated only in constructor.
+private final List registeredMetricNames = new 
ArrayList<>();
+
+private final Set instanceMetrics = 
Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST,
+THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, 
PLUGIN_EXPORT_TIME).collect(Collectors.toSet());
+
+ClientMetricsStats() {
+Measurable instanceCount = (config, now) -> 
clientInstanceCache.size();
+MetricName instanceCountMetric = 
metrics.metricName(INSTANCE_COUNT, GROUP_NAME,
+"The current number of client metric instances being managed 
by the broker");
+metrics.addMetric(instanceCountMetric, instanceCount);
+registeredMetricNames.add(instanceCountMetric);
+}
+
+public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) {
+// If one sensor of the metrics has been registered for the client 
instance,
+// then all other sensors should have been registered; and vice 
versa.
+if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != 
null) {
+return;
+}
+
+Map tags = 
Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, 
clientInstanceId.toString());
+
+Sensor unknownSubscriptionRequestCountSensor = metrics.sensor(
+ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST + "-" + 
clientInstanceId);
+unknownSubscriptionRequestCountSensor.add(createMeter(metrics, new 
WindowedCount(),
+ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST, tags));
+sensorsName.add(unknownSubscriptionRequestCountSensor.name());
+
+Sensor throttleCount = metrics.sensor(ClientMetricsStats.THROTTLE 
+ "-" + clientInstanceId);
+throttleCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.THROTTLE, tags));
+sensorsName.add(throttleCount.name());
+
+Sensor pluginExportCount = 
metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT + "-" + clientInstanceId);
+pluginExportCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.PLUGIN_EXPORT, tags));
+sensorsName.add(pluginExportCount.name());
+
+Sensor pluginErrorCount = 
metrics.sensor(ClientMetricsStats.PLUGIN_ERROR + "-" + clientInstanceId);
+pluginErrorCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.PLUGIN_ERROR, tags));

Review Comment:
   I don't think so that we should have a tag with error reason rather that we 
should log it. I have added the log line in the PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-29 Thread via GitHub


apoorvmittal10 commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1469845294


##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -288,6 +307,9 @@ private ClientMetricsInstance 
createClientInstanceAndUpdateCache(Uuid clientInst
 ClientMetricsInstanceMetadata instanceMetadata) {
 
 ClientMetricsInstance clientInstance = 
createClientInstance(clientInstanceId, instanceMetadata);
+// Maybe add client metrics, if metrics not already added. Metrics 
might be already added
+// if the client instance was evicted from the cache because of size 
limit.

Review Comment:
   The only place where we do not do that is when LRU cache eviction happens 
because of size which practically shouldn't happen but have added a safe check 
so we do not get duplicate metric registration error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-29 Thread via GitHub


apoorvmittal10 commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1469843003


##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -493,4 +519,123 @@ public void run() {
 }
 }
 }
+
+// Visible for testing
+final class ClientMetricsStats {
+
+private static final String GROUP_NAME = "ClientMetrics";
+
+// Visible for testing
+static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";
+static final String UNKNOWN_SUBSCRIPTION_REQUEST = 
"ClientMetricsUnknownSubscriptionRequest";
+static final String THROTTLE = "ClientMetricsThrottle";
+static final String PLUGIN_EXPORT = "ClientMetricsPluginExport";
+static final String PLUGIN_ERROR = "ClientMetricsPluginError";
+static final String PLUGIN_EXPORT_TIME = 
"ClientMetricsPluginExportTime";
+
+// Names of sensors that are registered through client instances.
+private final Set sensorsName = ConcurrentHashMap.newKeySet();
+// List of metric names which are not specific to a client instance. 
Do not require thread
+// safe structure as it will be populated only in constructor.
+private final List registeredMetricNames = new 
ArrayList<>();
+
+private final Set instanceMetrics = 
Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST,
+THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, 
PLUGIN_EXPORT_TIME).collect(Collectors.toSet());
+
+ClientMetricsStats() {
+Measurable instanceCount = (config, now) -> 
clientInstanceCache.size();
+MetricName instanceCountMetric = 
metrics.metricName(INSTANCE_COUNT, GROUP_NAME,
+"The current number of client metric instances being managed 
by the broker");
+metrics.addMetric(instanceCountMetric, instanceCount);
+registeredMetricNames.add(instanceCountMetric);
+}
+
+public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) {
+// If one sensor of the metrics has been registered for the client 
instance,
+// then all other sensors should have been registered; and vice 
versa.
+if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != 
null) {
+return;
+}
+
+Map tags = 
Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, 
clientInstanceId.toString());
+
+Sensor unknownSubscriptionRequestCountSensor = metrics.sensor(
+ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST + "-" + 
clientInstanceId);

Review Comment:
   Yes, this needs to be corrected in the KIP.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-29 Thread via GitHub


apoorvmittal10 commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1469836415


##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -493,4 +519,123 @@ public void run() {
 }
 }
 }
+
+// Visible for testing
+final class ClientMetricsStats {
+
+private static final String GROUP_NAME = "ClientMetrics";
+
+// Visible for testing
+static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";
+static final String UNKNOWN_SUBSCRIPTION_REQUEST = 
"ClientMetricsUnknownSubscriptionRequest";
+static final String THROTTLE = "ClientMetricsThrottle";
+static final String PLUGIN_EXPORT = "ClientMetricsPluginExport";
+static final String PLUGIN_ERROR = "ClientMetricsPluginError";
+static final String PLUGIN_EXPORT_TIME = 
"ClientMetricsPluginExportTime";
+
+// Names of sensors that are registered through client instances.
+private final Set sensorsName = ConcurrentHashMap.newKeySet();
+// List of metric names which are not specific to a client instance. 
Do not require thread
+// safe structure as it will be populated only in constructor.
+private final List registeredMetricNames = new 
ArrayList<>();
+
+private final Set instanceMetrics = 
Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST,
+THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, 
PLUGIN_EXPORT_TIME).collect(Collectors.toSet());
+
+ClientMetricsStats() {
+Measurable instanceCount = (config, now) -> 
clientInstanceCache.size();
+MetricName instanceCountMetric = 
metrics.metricName(INSTANCE_COUNT, GROUP_NAME,
+"The current number of client metric instances being managed 
by the broker");
+metrics.addMetric(instanceCountMetric, instanceCount);
+registeredMetricNames.add(instanceCountMetric);
+}
+
+public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) {
+// If one sensor of the metrics has been registered for the client 
instance,
+// then all other sensors should have been registered; and vice 
versa.
+if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != 
null) {
+return;
+}
+
+Map tags = 
Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, 
clientInstanceId.toString());
+
+Sensor unknownSubscriptionRequestCountSensor = metrics.sensor(
+ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST + "-" + 
clientInstanceId);
+unknownSubscriptionRequestCountSensor.add(createMeter(metrics, new 
WindowedCount(),
+ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST, tags));
+sensorsName.add(unknownSubscriptionRequestCountSensor.name());
+
+Sensor throttleCount = metrics.sensor(ClientMetricsStats.THROTTLE 
+ "-" + clientInstanceId);
+throttleCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.THROTTLE, tags));
+sensorsName.add(throttleCount.name());
+
+Sensor pluginExportCount = 
metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT + "-" + clientInstanceId);
+pluginExportCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.PLUGIN_EXPORT, tags));
+sensorsName.add(pluginExportCount.name());
+
+Sensor pluginErrorCount = 
metrics.sensor(ClientMetricsStats.PLUGIN_ERROR + "-" + clientInstanceId);
+pluginErrorCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.PLUGIN_ERROR, tags));
+sensorsName.add(pluginErrorCount.name());
+
+Sensor pluginExportTime = 
metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT_TIME + "-" + clientInstanceId);
+
pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + 
"Avg",
+ClientMetricsStats.GROUP_NAME, "Average time broker spent in 
invoking plugin exportMetrics call", tags), new Avg());
+
pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + 
"Max",
+ClientMetricsStats.GROUP_NAME, "Maximum time broker spent in 
invoking plugin exportMetrics call", tags), new Max());
+sensorsName.add(pluginExportTime.name());
+}
+
+public void recordUnknownSubscriptionCount(Uuid clientInstanceId) {
+record(UNKNOWN_SUBSCRIPTION_REQUEST, clientInstanceId);
+}
+
+public void recordThrottleCount(Uuid clientInstanceId) {
+record(THROTTLE, clientInstanceId);
+}
+
+public void recordPluginExport(Uuid clientInstanceId, long timeMs) {
+record(PLUGIN_EXPORT, clientInstanceId);
+record(PLUGIN_EXPORT_TIME, clientInstanceId, timeMs);
+}
+
+public void recordPluginErrorCount(Uuid clientInstanceId) {
+

Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-29 Thread via GitHub


apoorvmittal10 commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1469835995


##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -493,4 +519,123 @@ public void run() {
 }
 }
 }
+
+// Visible for testing
+final class ClientMetricsStats {
+
+private static final String GROUP_NAME = "ClientMetrics";
+
+// Visible for testing
+static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";
+static final String UNKNOWN_SUBSCRIPTION_REQUEST = 
"ClientMetricsUnknownSubscriptionRequest";
+static final String THROTTLE = "ClientMetricsThrottle";
+static final String PLUGIN_EXPORT = "ClientMetricsPluginExport";
+static final String PLUGIN_ERROR = "ClientMetricsPluginError";
+static final String PLUGIN_EXPORT_TIME = 
"ClientMetricsPluginExportTime";
+
+// Names of sensors that are registered through client instances.
+private final Set sensorsName = ConcurrentHashMap.newKeySet();
+// List of metric names which are not specific to a client instance. 
Do not require thread
+// safe structure as it will be populated only in constructor.
+private final List registeredMetricNames = new 
ArrayList<>();
+
+private final Set instanceMetrics = 
Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST,
+THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, 
PLUGIN_EXPORT_TIME).collect(Collectors.toSet());
+
+ClientMetricsStats() {
+Measurable instanceCount = (config, now) -> 
clientInstanceCache.size();
+MetricName instanceCountMetric = 
metrics.metricName(INSTANCE_COUNT, GROUP_NAME,
+"The current number of client metric instances being managed 
by the broker");

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-29 Thread via GitHub


apoorvmittal10 commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1469798809


##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -493,4 +519,123 @@ public void run() {
 }
 }
 }
+
+// Visible for testing
+final class ClientMetricsStats {
+
+private static final String GROUP_NAME = "ClientMetrics";
+
+// Visible for testing
+static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";
+static final String UNKNOWN_SUBSCRIPTION_REQUEST = 
"ClientMetricsUnknownSubscriptionRequest";
+static final String THROTTLE = "ClientMetricsThrottle";
+static final String PLUGIN_EXPORT = "ClientMetricsPluginExport";
+static final String PLUGIN_ERROR = "ClientMetricsPluginError";
+static final String PLUGIN_EXPORT_TIME = 
"ClientMetricsPluginExportTime";
+
+// Names of sensors that are registered through client instances.
+private final Set sensorsName = ConcurrentHashMap.newKeySet();
+// List of metric names which are not specific to a client instance. 
Do not require thread
+// safe structure as it will be populated only in constructor.
+private final List registeredMetricNames = new 
ArrayList<>();
+
+private final Set instanceMetrics = 
Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST,
+THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, 
PLUGIN_EXPORT_TIME).collect(Collectors.toSet());
+
+ClientMetricsStats() {
+Measurable instanceCount = (config, now) -> 
clientInstanceCache.size();
+MetricName instanceCountMetric = 
metrics.metricName(INSTANCE_COUNT, GROUP_NAME,
+"The current number of client metric instances being managed 
by the broker");
+metrics.addMetric(instanceCountMetric, instanceCount);
+registeredMetricNames.add(instanceCountMetric);
+}
+
+public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) {
+// If one sensor of the metrics has been registered for the client 
instance,
+// then all other sensors should have been registered; and vice 
versa.
+if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != 
null) {
+return;
+}
+
+Map tags = 
Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, 
clientInstanceId.toString());
+
+Sensor unknownSubscriptionRequestCountSensor = metrics.sensor(
+ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST + "-" + 
clientInstanceId);
+unknownSubscriptionRequestCountSensor.add(createMeter(metrics, new 
WindowedCount(),
+ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST, tags));
+sensorsName.add(unknownSubscriptionRequestCountSensor.name());
+
+Sensor throttleCount = metrics.sensor(ClientMetricsStats.THROTTLE 
+ "-" + clientInstanceId);
+throttleCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.THROTTLE, tags));
+sensorsName.add(throttleCount.name());
+
+Sensor pluginExportCount = 
metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT + "-" + clientInstanceId);
+pluginExportCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.PLUGIN_EXPORT, tags));
+sensorsName.add(pluginExportCount.name());
+
+Sensor pluginErrorCount = 
metrics.sensor(ClientMetricsStats.PLUGIN_ERROR + "-" + clientInstanceId);
+pluginErrorCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.PLUGIN_ERROR, tags));
+sensorsName.add(pluginErrorCount.name());
+
+Sensor pluginExportTime = 
metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT_TIME + "-" + clientInstanceId);
+
pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + 
"Avg",
+ClientMetricsStats.GROUP_NAME, "Average time broker spent in 
invoking plugin exportMetrics call", tags), new Avg());
+
pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + 
"Max",
+ClientMetricsStats.GROUP_NAME, "Maximum time broker spent in 
invoking plugin exportMetrics call", tags), new Max());
+sensorsName.add(pluginExportTime.name());
+}
+
+public void recordUnknownSubscriptionCount(Uuid clientInstanceId) {
+record(UNKNOWN_SUBSCRIPTION_REQUEST, clientInstanceId);
+}
+
+public void recordThrottleCount(Uuid clientInstanceId) {
+record(THROTTLE, clientInstanceId);
+}
+
+public void recordPluginExport(Uuid clientInstanceId, long timeMs) {
+record(PLUGIN_EXPORT, clientInstanceId);
+record(PLUGIN_EXPORT_TIME, clientInstanceId, timeMs);

Review Comment:
   I might be wrong, but we have `record` method on sensor where `PLUGIN_EXPORT 

Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-29 Thread via GitHub


apoorvmittal10 commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1469784272


##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -493,4 +519,123 @@ public void run() {
 }
 }
 }
+
+// Visible for testing
+final class ClientMetricsStats {
+
+private static final String GROUP_NAME = "ClientMetrics";
+
+// Visible for testing
+static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";
+static final String UNKNOWN_SUBSCRIPTION_REQUEST = 
"ClientMetricsUnknownSubscriptionRequest";
+static final String THROTTLE = "ClientMetricsThrottle";
+static final String PLUGIN_EXPORT = "ClientMetricsPluginExport";
+static final String PLUGIN_ERROR = "ClientMetricsPluginError";
+static final String PLUGIN_EXPORT_TIME = 
"ClientMetricsPluginExportTime";
+
+// Names of sensors that are registered through client instances.
+private final Set sensorsName = ConcurrentHashMap.newKeySet();
+// List of metric names which are not specific to a client instance. 
Do not require thread
+// safe structure as it will be populated only in constructor.
+private final List registeredMetricNames = new 
ArrayList<>();
+
+private final Set instanceMetrics = 
Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST,
+THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, 
PLUGIN_EXPORT_TIME).collect(Collectors.toSet());
+
+ClientMetricsStats() {
+Measurable instanceCount = (config, now) -> 
clientInstanceCache.size();
+MetricName instanceCountMetric = 
metrics.metricName(INSTANCE_COUNT, GROUP_NAME,
+"The current number of client metric instances being managed 
by the broker");
+metrics.addMetric(instanceCountMetric, instanceCount);
+registeredMetricNames.add(instanceCountMetric);
+}
+
+public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) {
+// If one sensor of the metrics has been registered for the client 
instance,
+// then all other sensors should have been registered; and vice 
versa.
+if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != 
null) {
+return;
+}
+
+Map tags = 
Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, 
clientInstanceId.toString());
+
+Sensor unknownSubscriptionRequestCountSensor = metrics.sensor(

Review Comment:
   If we do want a simple `ClientMetricsUnknownSubscriptionRequestCount` then I 
can get rid of client instance id and the counter can be simple tracking the 
number of unknown subscriptions. But if we do want to add a tag with client 
software version then metrics in JMX will have separate buckets under 
`ClientMetrics` per client software version with count. 
   My reasoning for tagging with client instance id was that all metrics for an 
instance falls in same bucket in JMX. Also as we already have client metrics to 
identify which client software version is associated with which client instance 
hence we can anyways find the software version, if required.
   Please let me know what you think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-24 Thread via GitHub


AndrewJSchofield commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1465945556


##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -493,4 +519,123 @@ public void run() {
 }
 }
 }
+
+// Visible for testing
+final class ClientMetricsStats {
+
+private static final String GROUP_NAME = "ClientMetrics";
+
+// Visible for testing
+static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";
+static final String UNKNOWN_SUBSCRIPTION_REQUEST = 
"ClientMetricsUnknownSubscriptionRequest";
+static final String THROTTLE = "ClientMetricsThrottle";
+static final String PLUGIN_EXPORT = "ClientMetricsPluginExport";
+static final String PLUGIN_ERROR = "ClientMetricsPluginError";
+static final String PLUGIN_EXPORT_TIME = 
"ClientMetricsPluginExportTime";
+
+// Names of sensors that are registered through client instances.
+private final Set sensorsName = ConcurrentHashMap.newKeySet();
+// List of metric names which are not specific to a client instance. 
Do not require thread
+// safe structure as it will be populated only in constructor.
+private final List registeredMetricNames = new 
ArrayList<>();
+
+private final Set instanceMetrics = 
Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST,
+THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, 
PLUGIN_EXPORT_TIME).collect(Collectors.toSet());
+
+ClientMetricsStats() {
+Measurable instanceCount = (config, now) -> 
clientInstanceCache.size();
+MetricName instanceCountMetric = 
metrics.metricName(INSTANCE_COUNT, GROUP_NAME,

Review Comment:
   Yes. I anticipate there will be a KIP correction needed here. Once we're 
happy with the PR, I'll pull together a correction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-24 Thread via GitHub


junrao commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1465454670


##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -493,4 +519,123 @@ public void run() {
 }
 }
 }
+
+// Visible for testing
+final class ClientMetricsStats {
+
+private static final String GROUP_NAME = "ClientMetrics";
+
+// Visible for testing
+static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";
+static final String UNKNOWN_SUBSCRIPTION_REQUEST = 
"ClientMetricsUnknownSubscriptionRequest";
+static final String THROTTLE = "ClientMetricsThrottle";
+static final String PLUGIN_EXPORT = "ClientMetricsPluginExport";
+static final String PLUGIN_ERROR = "ClientMetricsPluginError";
+static final String PLUGIN_EXPORT_TIME = 
"ClientMetricsPluginExportTime";
+
+// Names of sensors that are registered through client instances.
+private final Set sensorsName = ConcurrentHashMap.newKeySet();
+// List of metric names which are not specific to a client instance. 
Do not require thread
+// safe structure as it will be populated only in constructor.
+private final List registeredMetricNames = new 
ArrayList<>();
+
+private final Set instanceMetrics = 
Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST,
+THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, 
PLUGIN_EXPORT_TIME).collect(Collectors.toSet());
+
+ClientMetricsStats() {
+Measurable instanceCount = (config, now) -> 
clientInstanceCache.size();
+MetricName instanceCountMetric = 
metrics.metricName(INSTANCE_COUNT, GROUP_NAME,
+"The current number of client metric instances being managed 
by the broker");
+metrics.addMetric(instanceCountMetric, instanceCount);
+registeredMetricNames.add(instanceCountMetric);
+}
+
+public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) {
+// If one sensor of the metrics has been registered for the client 
instance,
+// then all other sensors should have been registered; and vice 
versa.
+if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != 
null) {
+return;
+}
+
+Map tags = 
Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, 
clientInstanceId.toString());
+
+Sensor unknownSubscriptionRequestCountSensor = metrics.sensor(
+ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST + "-" + 
clientInstanceId);
+unknownSubscriptionRequestCountSensor.add(createMeter(metrics, new 
WindowedCount(),
+ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST, tags));
+sensorsName.add(unknownSubscriptionRequestCountSensor.name());
+
+Sensor throttleCount = metrics.sensor(ClientMetricsStats.THROTTLE 
+ "-" + clientInstanceId);
+throttleCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.THROTTLE, tags));
+sensorsName.add(throttleCount.name());
+
+Sensor pluginExportCount = 
metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT + "-" + clientInstanceId);
+pluginExportCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.PLUGIN_EXPORT, tags));
+sensorsName.add(pluginExportCount.name());
+
+Sensor pluginErrorCount = 
metrics.sensor(ClientMetricsStats.PLUGIN_ERROR + "-" + clientInstanceId);
+pluginErrorCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.PLUGIN_ERROR, tags));
+sensorsName.add(pluginErrorCount.name());
+
+Sensor pluginExportTime = 
metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT_TIME + "-" + clientInstanceId);
+
pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + 
"Avg",
+ClientMetricsStats.GROUP_NAME, "Average time broker spent in 
invoking plugin exportMetrics call", tags), new Avg());
+
pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + 
"Max",
+ClientMetricsStats.GROUP_NAME, "Maximum time broker spent in 
invoking plugin exportMetrics call", tags), new Max());
+sensorsName.add(pluginExportTime.name());
+}
+
+public void recordUnknownSubscriptionCount(Uuid clientInstanceId) {
+record(UNKNOWN_SUBSCRIPTION_REQUEST, clientInstanceId);
+}
+
+public void recordThrottleCount(Uuid clientInstanceId) {
+record(THROTTLE, clientInstanceId);
+}
+
+public void recordPluginExport(Uuid clientInstanceId, long timeMs) {
+record(PLUGIN_EXPORT, clientInstanceId);
+record(PLUGIN_EXPORT_TIME, clientInstanceId, timeMs);
+}
+
+public void recordPluginErrorCount(Uuid clientInstanceId) {
+

Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-24 Thread via GitHub


junrao commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1465476918


##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -493,4 +519,123 @@ public void run() {
 }
 }
 }
+
+// Visible for testing
+final class ClientMetricsStats {
+
+private static final String GROUP_NAME = "ClientMetrics";
+
+// Visible for testing
+static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";
+static final String UNKNOWN_SUBSCRIPTION_REQUEST = 
"ClientMetricsUnknownSubscriptionRequest";
+static final String THROTTLE = "ClientMetricsThrottle";
+static final String PLUGIN_EXPORT = "ClientMetricsPluginExport";
+static final String PLUGIN_ERROR = "ClientMetricsPluginError";
+static final String PLUGIN_EXPORT_TIME = 
"ClientMetricsPluginExportTime";
+
+// Names of sensors that are registered through client instances.
+private final Set sensorsName = ConcurrentHashMap.newKeySet();
+// List of metric names which are not specific to a client instance. 
Do not require thread
+// safe structure as it will be populated only in constructor.
+private final List registeredMetricNames = new 
ArrayList<>();
+
+private final Set instanceMetrics = 
Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST,
+THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, 
PLUGIN_EXPORT_TIME).collect(Collectors.toSet());
+
+ClientMetricsStats() {
+Measurable instanceCount = (config, now) -> 
clientInstanceCache.size();
+MetricName instanceCountMetric = 
metrics.metricName(INSTANCE_COUNT, GROUP_NAME,

Review Comment:
   The KIP includes a tag for software version. Should we correct the KIP?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-24 Thread via GitHub


junrao commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1465492172


##
server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java:
##
@@ -1023,5 +1177,18 @@ public void 
testCacheExpirationTaskCancelledOnInstanceUpdate() throws UnknownHos
 assertNotNull(instance);
 assertNotNull(instance.expirationTimerTask());
 assertEquals(1, clientMetricsManager.expirationTimer().size());
+// Metrics size should remain same on instance update.
+assertEquals(12, kafkaMetrics.metrics().size());
+assertEquals((double) 1, 
getMetric(ClientMetricsManager.ClientMetricsStats.INSTANCE_COUNT).metricValue());
+}
+
+private KafkaMetric getMetric(String name) throws Exception {
+Optional> metric = 
kafkaMetrics.metrics().entrySet().stream()

Review Comment:
   Hmm, why do we need to iterate the whole map instead of doing a `get` based 
on name on the map?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-24 Thread via GitHub


junrao commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1465474347


##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -493,4 +519,123 @@ public void run() {
 }
 }
 }
+
+// Visible for testing
+final class ClientMetricsStats {
+
+private static final String GROUP_NAME = "ClientMetrics";
+
+// Visible for testing
+static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";
+static final String UNKNOWN_SUBSCRIPTION_REQUEST = 
"ClientMetricsUnknownSubscriptionRequest";
+static final String THROTTLE = "ClientMetricsThrottle";
+static final String PLUGIN_EXPORT = "ClientMetricsPluginExport";
+static final String PLUGIN_ERROR = "ClientMetricsPluginError";
+static final String PLUGIN_EXPORT_TIME = 
"ClientMetricsPluginExportTime";
+
+// Names of sensors that are registered through client instances.
+private final Set sensorsName = ConcurrentHashMap.newKeySet();
+// List of metric names which are not specific to a client instance. 
Do not require thread
+// safe structure as it will be populated only in constructor.
+private final List registeredMetricNames = new 
ArrayList<>();
+
+private final Set instanceMetrics = 
Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST,
+THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, 
PLUGIN_EXPORT_TIME).collect(Collectors.toSet());
+
+ClientMetricsStats() {
+Measurable instanceCount = (config, now) -> 
clientInstanceCache.size();
+MetricName instanceCountMetric = 
metrics.metricName(INSTANCE_COUNT, GROUP_NAME,
+"The current number of client metric instances being managed 
by the broker");
+metrics.addMetric(instanceCountMetric, instanceCount);
+registeredMetricNames.add(instanceCountMetric);
+}
+
+public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) {
+// If one sensor of the metrics has been registered for the client 
instance,
+// then all other sensors should have been registered; and vice 
versa.
+if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != 
null) {
+return;
+}
+
+Map tags = 
Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, 
clientInstanceId.toString());
+
+Sensor unknownSubscriptionRequestCountSensor = metrics.sensor(

Review Comment:
   The KIP includes a tag with client software version. Is that correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-24 Thread via GitHub


junrao commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1465446342


##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -493,4 +519,123 @@ public void run() {
 }
 }
 }
+
+// Visible for testing
+final class ClientMetricsStats {
+
+private static final String GROUP_NAME = "ClientMetrics";
+
+// Visible for testing
+static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";
+static final String UNKNOWN_SUBSCRIPTION_REQUEST = 
"ClientMetricsUnknownSubscriptionRequest";
+static final String THROTTLE = "ClientMetricsThrottle";
+static final String PLUGIN_EXPORT = "ClientMetricsPluginExport";
+static final String PLUGIN_ERROR = "ClientMetricsPluginError";
+static final String PLUGIN_EXPORT_TIME = 
"ClientMetricsPluginExportTime";
+
+// Names of sensors that are registered through client instances.
+private final Set sensorsName = ConcurrentHashMap.newKeySet();
+// List of metric names which are not specific to a client instance. 
Do not require thread
+// safe structure as it will be populated only in constructor.
+private final List registeredMetricNames = new 
ArrayList<>();
+
+private final Set instanceMetrics = 
Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST,
+THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, 
PLUGIN_EXPORT_TIME).collect(Collectors.toSet());
+
+ClientMetricsStats() {
+Measurable instanceCount = (config, now) -> 
clientInstanceCache.size();
+MetricName instanceCountMetric = 
metrics.metricName(INSTANCE_COUNT, GROUP_NAME,
+"The current number of client metric instances being managed 
by the broker");
+metrics.addMetric(instanceCountMetric, instanceCount);
+registeredMetricNames.add(instanceCountMetric);
+}
+
+public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) {
+// If one sensor of the metrics has been registered for the client 
instance,
+// then all other sensors should have been registered; and vice 
versa.
+if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != 
null) {
+return;
+}
+
+Map tags = 
Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, 
clientInstanceId.toString());
+
+Sensor unknownSubscriptionRequestCountSensor = metrics.sensor(
+ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST + "-" + 
clientInstanceId);
+unknownSubscriptionRequestCountSensor.add(createMeter(metrics, new 
WindowedCount(),
+ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST, tags));
+sensorsName.add(unknownSubscriptionRequestCountSensor.name());
+
+Sensor throttleCount = metrics.sensor(ClientMetricsStats.THROTTLE 
+ "-" + clientInstanceId);
+throttleCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.THROTTLE, tags));
+sensorsName.add(throttleCount.name());
+
+Sensor pluginExportCount = 
metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT + "-" + clientInstanceId);
+pluginExportCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.PLUGIN_EXPORT, tags));
+sensorsName.add(pluginExportCount.name());
+
+Sensor pluginErrorCount = 
metrics.sensor(ClientMetricsStats.PLUGIN_ERROR + "-" + clientInstanceId);
+pluginErrorCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.PLUGIN_ERROR, tags));
+sensorsName.add(pluginErrorCount.name());
+
+Sensor pluginExportTime = 
metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT_TIME + "-" + clientInstanceId);
+
pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + 
"Avg",
+ClientMetricsStats.GROUP_NAME, "Average time broker spent in 
invoking plugin exportMetrics call", tags), new Avg());
+
pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + 
"Max",
+ClientMetricsStats.GROUP_NAME, "Maximum time broker spent in 
invoking plugin exportMetrics call", tags), new Max());
+sensorsName.add(pluginExportTime.name());
+}
+
+public void recordUnknownSubscriptionCount(Uuid clientInstanceId) {
+record(UNKNOWN_SUBSCRIPTION_REQUEST, clientInstanceId);
+}
+
+public void recordThrottleCount(Uuid clientInstanceId) {
+record(THROTTLE, clientInstanceId);
+}
+
+public void recordPluginExport(Uuid clientInstanceId, long timeMs) {
+record(PLUGIN_EXPORT, clientInstanceId);
+record(PLUGIN_EXPORT_TIME, clientInstanceId, timeMs);

Review Comment:
   Since these two metrics are derived from the same source, they can be on the 
same 

Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-24 Thread via GitHub


junrao commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1465497737


##
server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java:
##
@@ -299,8 +332,8 @@ public void 
testGetTelemetrySameClientImmediateRetryAfterPushFail() throws Unkno
 // Create new client metrics manager which simulates a new server as 
it will not have any
 // last request information but request should succeed as subscription 
id should match
 // the one with new client instance.
-
-ClientMetricsManager newClientMetricsManager = new 
ClientMetricsManager(clientMetricsReceiverPlugin, 100, time);
+kafkaMetrics = new Metrics();

Review Comment:
   We create a new instance during setup already. Do we need to construct a new 
instance here? Ditto in a few other places.



##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -191,8 +206,11 @@ public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest re
 byte[] metrics = request.data().metrics();
 if (metrics != null && metrics.length > 0) {
 try {
+long exportTimeStartMs = time.milliseconds();

Review Comment:
   If we just want to measure the amount of time passed, `time.hiResClockMs` is 
cheaper.



##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -493,4 +519,123 @@ public void run() {
 }
 }
 }
+
+// Visible for testing
+final class ClientMetricsStats {
+
+private static final String GROUP_NAME = "ClientMetrics";
+
+// Visible for testing
+static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";
+static final String UNKNOWN_SUBSCRIPTION_REQUEST = 
"ClientMetricsUnknownSubscriptionRequest";
+static final String THROTTLE = "ClientMetricsThrottle";
+static final String PLUGIN_EXPORT = "ClientMetricsPluginExport";
+static final String PLUGIN_ERROR = "ClientMetricsPluginError";
+static final String PLUGIN_EXPORT_TIME = 
"ClientMetricsPluginExportTime";
+
+// Names of sensors that are registered through client instances.
+private final Set sensorsName = ConcurrentHashMap.newKeySet();
+// List of metric names which are not specific to a client instance. 
Do not require thread
+// safe structure as it will be populated only in constructor.
+private final List registeredMetricNames = new 
ArrayList<>();
+
+private final Set instanceMetrics = 
Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST,
+THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, 
PLUGIN_EXPORT_TIME).collect(Collectors.toSet());
+
+ClientMetricsStats() {
+Measurable instanceCount = (config, now) -> 
clientInstanceCache.size();
+MetricName instanceCountMetric = 
metrics.metricName(INSTANCE_COUNT, GROUP_NAME,
+"The current number of client metric instances being managed 
by the broker");
+metrics.addMetric(instanceCountMetric, instanceCount);
+registeredMetricNames.add(instanceCountMetric);
+}
+
+public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) {
+// If one sensor of the metrics has been registered for the client 
instance,
+// then all other sensors should have been registered; and vice 
versa.
+if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != 
null) {
+return;
+}
+
+Map tags = 
Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, 
clientInstanceId.toString());
+
+Sensor unknownSubscriptionRequestCountSensor = metrics.sensor(
+ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST + "-" + 
clientInstanceId);

Review Comment:
   > Total number of metrics requests GetTelemetrySubscriptionsRequests with 
unknown CLIENT_INSTANCE_IDs.
   
   The above is the description of the metric in the KIP. It seems inaccurate 
since it should be for unknown subscriptionId instead of CLIENT_INSTANCE_ID. 
Also, we track it for PushTelemetryRequest 
instead of GetTelemetrySubscriptionsRequest.



##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -493,4 +519,123 @@ public void run() {
 }
 }
 }
+
+// Visible for testing
+final class ClientMetricsStats {
+
+private static final String GROUP_NAME = "ClientMetrics";
+
+// Visible for testing
+static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";
+static final String UNKNOWN_SUBSCRIPTION_REQUEST = 
"ClientMetricsUnknownSubscriptionRequest";
+static final String THROTTLE = "ClientMetricsThrottle";
+static final String PLUGIN_EXPORT = "ClientMetricsPluginExport";
+static final String PLUGIN_ERROR = "ClientMetricsPluginError";

Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-24 Thread via GitHub


AndrewJSchofield commented on PR #15251:
URL: https://github.com/apache/kafka/pull/15251#issuecomment-1907905085

   I see that you have a differences from the KIP. It sounds like you're 
highlighting situations in which the KIP has small deviations from normal Kafka 
practice. I would support adjusting the KIP to match.
   
   The broker software version tag doesn't seem useful. There is already 
another metric for that specific information.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-24 Thread via GitHub


AndrewJSchofield commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1464735557


##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -493,4 +519,123 @@ public void run() {
 }
 }
 }
+
+// Visible for testing
+final class ClientMetricsStats {
+
+private static final String GROUP_NAME = "ClientMetrics";
+
+// Visible for testing
+static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";
+static final String UNKNOWN_SUBSCRIPTION_REQUEST = 
"ClientMetricsUnknownSubscriptionRequest";
+static final String THROTTLE = "ClientMetricsThrottle";
+static final String PLUGIN_EXPORT = "ClientMetricsPluginExport";
+static final String PLUGIN_ERROR = "ClientMetricsPluginError";
+static final String PLUGIN_EXPORT_TIME = 
"ClientMetricsPluginExportTime";
+
+// Names of sensors that are registered through client instances.
+private final Set sensorsName = ConcurrentHashMap.newKeySet();
+// List of metric names which are not specific to a client instance. 
Do not require thread
+// safe structure as it will be populated only in constructor.
+private final List registeredMetricNames = new 
ArrayList<>();
+
+private final Set instanceMetrics = 
Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST,
+THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, 
PLUGIN_EXPORT_TIME).collect(Collectors.toSet());
+
+ClientMetricsStats() {
+Measurable instanceCount = (config, now) -> 
clientInstanceCache.size();
+MetricName instanceCountMetric = 
metrics.metricName(INSTANCE_COUNT, GROUP_NAME,
+"The current number of client metric instances being managed 
by the broker");
+metrics.addMetric(instanceCountMetric, instanceCount);
+registeredMetricNames.add(instanceCountMetric);
+}
+
+public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) {
+// If one sensor of the metrics has been registered for the client 
instance,
+// then all other sensors should have been registered; and vice 
versa.
+if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != 
null) {
+return;
+}
+
+Map tags = 
Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, 
clientInstanceId.toString());
+
+Sensor unknownSubscriptionRequestCountSensor = metrics.sensor(

Review Comment:
   This one should not include the client instance ID. The KIP is expecting a 
simple `ClientMetricsUnknownSubscriptionRequestCount` I believe. As such, I 
don't think it's an instance metric in this case.



##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -493,4 +519,123 @@ public void run() {
 }
 }
 }
+
+// Visible for testing
+final class ClientMetricsStats {
+
+private static final String GROUP_NAME = "ClientMetrics";
+
+// Visible for testing
+static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";
+static final String UNKNOWN_SUBSCRIPTION_REQUEST = 
"ClientMetricsUnknownSubscriptionRequest";
+static final String THROTTLE = "ClientMetricsThrottle";
+static final String PLUGIN_EXPORT = "ClientMetricsPluginExport";
+static final String PLUGIN_ERROR = "ClientMetricsPluginError";
+static final String PLUGIN_EXPORT_TIME = 
"ClientMetricsPluginExportTime";
+
+// Names of sensors that are registered through client instances.
+private final Set sensorsName = ConcurrentHashMap.newKeySet();
+// List of metric names which are not specific to a client instance. 
Do not require thread
+// safe structure as it will be populated only in constructor.
+private final List registeredMetricNames = new 
ArrayList<>();
+
+private final Set instanceMetrics = 
Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST,
+THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, 
PLUGIN_EXPORT_TIME).collect(Collectors.toSet());
+
+ClientMetricsStats() {
+Measurable instanceCount = (config, now) -> 
clientInstanceCache.size();
+MetricName instanceCountMetric = 
metrics.metricName(INSTANCE_COUNT, GROUP_NAME,
+"The current number of client metric instances being managed 
by the broker");

Review Comment:
   I think this is a typo in the KIP really, copied to the code, but I would 
say "client metrics instances".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-24 Thread via GitHub


apoorvmittal10 commented on PR #15251:
URL: https://github.com/apache/kafka/pull/15251#issuecomment-1907853039

   @AndrewJSchofield @junrao Please if you can review and provide the feedback.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-24 Thread via GitHub


apoorvmittal10 opened a new pull request, #15251:
URL: https://github.com/apache/kafka/pull/15251

   The KIP-714 defines broker metrics 
[here](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Metrics)
 which should help determining the client instances metrics managed by the 
broker. However, there is certain change as per the metrics defined in KIP and 
the implementation which have been highlighted below. We can discuss and either 
amend the KIP or the implementation, though I prefer the former.
   
   There are 2 ways 2 emit metrics in broker i.e. using Kafka Metrics or Yammer 
Metrics. I have used Kafka Metrics as want to have minimal dependency on Yammer 
(though open for suggestions).
   
   **Changes/Query** | Metric Name | Type | Group | Tags
   -- | -- | -- | -- | --
   What should be the name of the tag for broker's version and how can we fetch 
that as I do not see any placeholder in RequestContext?  | 
ClientMetricsInstanceCount | Gauge | ClientMetrics | version: broker's software 
version
   The metric is defined as Meter which according to KafkaMetrics requires 
another metric called Rate hence 2 metrics are emitted for same as 
`ClientMetricsPluginErrorCount` and `ClientMetricsPluginErrorRate` | 
ClientMetricsPluginErrorCount | Meter | ClientMetrics | 
client_instance_idreason (reason for the failure)
   The metric is defined as Meter which according to KafkaMetrics requires 
another metric called Rate hence 2 metrics are emitted for same as 
`ClientMetricsPluginExportCount` and `ClientMetricsPluginExportRate` | 
ClientMetricsPluginExportCount | Meter | ClientMetrics | client_instance_id
   The metric is defined as Meter which according to KafkaMetrics requires 
another metric called Rate hence 2 metrics are emitted for same as 
`ClientMetricsThrottleCount` and `ClientMetricsThrottleRate` | 
ClientMetricsThrottleCount | Meter | ClientMetrics | client_instance_id
   The metric is defined as Meter which according to KafkaMetrics requires 
another metric called Rate hence 2 metrics are emitted for same as 
`ClientMetricsUnknownSubscriptionRequestCount` and 
`ClientMetricsUnknownSubscriptionRequestRate` | 
ClientMetricsUnknownSubscriptionRequestCount | Meter | ClientMetrics | client 
version: client's software version
   The metric is defined as Histogram but generally we use `Avg` and `Max` as 
metrics to capture `latency` related metrics (which requires Histogram) hence I 
have 2 metrics for same as `ClientMetricsPluginExportTimeAvg` and 
`ClientMetricsPluginExportTimeMax` | ClientMetricsPluginExportTime | Histogram 
| ClientMetrics | client_instance_id
   
   
   https://github.com/apache/kafka/assets/2861565/32172c3c-b098-49f7-b5d8-5eaabf73fcb1;>
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org