Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #15251: URL: https://github.com/apache/kafka/pull/15251#discussion_r1555905789 ## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ## @@ -493,4 +520,124 @@ public void run() { } } } + +// Visible for testing +final class ClientMetricsStats { + +private static final String GROUP_NAME = "ClientMetrics"; + +// Visible for testing +static final String INSTANCE_COUNT = "ClientMetricsInstanceCount"; Review Comment: @junrao thanks for the details. I have opened the PR to address same: https://github.com/apache/kafka/pull/15680 cc: @AndrewJSchofield -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
junrao commented on code in PR #15251: URL: https://github.com/apache/kafka/pull/15251#discussion_r1500047932 ## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ## @@ -493,4 +520,124 @@ public void run() { } } } + +// Visible for testing +final class ClientMetricsStats { + +private static final String GROUP_NAME = "ClientMetrics"; + +// Visible for testing +static final String INSTANCE_COUNT = "ClientMetricsInstanceCount"; Review Comment: @apoorvmittal10 : On the server side, we have been using both Yammer metrics (org.apache.kafka.server.metrics.KafkaMetricsGroup) and Kafka metrics (org.apache.kafka.common.metrics.Metrics). The convention is that for the former, we use camel case. For the latter, we use hyphen. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #15251: URL: https://github.com/apache/kafka/pull/15251#discussion_r1500028907 ## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ## @@ -493,4 +520,124 @@ public void run() { } } } + +// Visible for testing +final class ClientMetricsStats { + +private static final String GROUP_NAME = "ClientMetrics"; + +// Visible for testing +static final String INSTANCE_COUNT = "ClientMetricsInstanceCount"; Review Comment: @junrao Thanks for bringing this up. I did look into the naming for metrics and found it's kind of mixed in Kafka as of now. I have attached below 2 screenshots from what I see in JMX from `kafka.server` metrics itself. I myself was thinking that if there is some guidelines for the new metrics we follow. https://github.com/apache/kafka/assets/2861565/8435d2ff-a4cc-40e0-93c2-e06ee317fb4f;> https://github.com/apache/kafka/assets/2861565/0363ff09-0dc1-48d6-80de-c112a4da3ba1;> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
junrao commented on code in PR #15251: URL: https://github.com/apache/kafka/pull/15251#discussion_r1498439741 ## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ## @@ -493,4 +520,124 @@ public void run() { } } } + +// Visible for testing +final class ClientMetricsStats { + +private static final String GROUP_NAME = "ClientMetrics"; + +// Visible for testing +static final String INSTANCE_COUNT = "ClientMetricsInstanceCount"; Review Comment: So far, the naming convention for Kafka metrics is to use hyphen. So, ClientMetricsInstanceCount should be client-metrics-instance-count. Ditto below. Since the code hasn't been released yet. Could we make the change and update the KIP? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
apoorvmittal10 closed pull request #15251: KAFKA-16186: Broker metrics for client telemetry (KIP-714) URL: https://github.com/apache/kafka/pull/15251 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
apoorvmittal10 commented on PR #15251: URL: https://github.com/apache/kafka/pull/15251#issuecomment-1915703872 > @apoorvmittal10 : Thanks for the updated PR. A few more comments. Thanks for the review and suggestions. I have addressed the comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #15251: URL: https://github.com/apache/kafka/pull/15251#discussion_r1470310097 ## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ## @@ -493,4 +519,123 @@ public void run() { } } } + +// Visible for testing +final class ClientMetricsStats { + +private static final String GROUP_NAME = "ClientMetrics"; + +// Visible for testing +static final String INSTANCE_COUNT = "ClientMetricsInstanceCount"; +static final String UNKNOWN_SUBSCRIPTION_REQUEST = "ClientMetricsUnknownSubscriptionRequest"; +static final String THROTTLE = "ClientMetricsThrottle"; +static final String PLUGIN_EXPORT = "ClientMetricsPluginExport"; +static final String PLUGIN_ERROR = "ClientMetricsPluginError"; +static final String PLUGIN_EXPORT_TIME = "ClientMetricsPluginExportTime"; + +// Names of sensors that are registered through client instances. +private final Set sensorsName = ConcurrentHashMap.newKeySet(); +// List of metric names which are not specific to a client instance. Do not require thread +// safe structure as it will be populated only in constructor. +private final List registeredMetricNames = new ArrayList<>(); + +private final Set instanceMetrics = Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST, +THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, PLUGIN_EXPORT_TIME).collect(Collectors.toSet()); + +ClientMetricsStats() { +Measurable instanceCount = (config, now) -> clientInstanceCache.size(); +MetricName instanceCountMetric = metrics.metricName(INSTANCE_COUNT, GROUP_NAME, +"The current number of client metric instances being managed by the broker"); +metrics.addMetric(instanceCountMetric, instanceCount); +registeredMetricNames.add(instanceCountMetric); +} + +public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) { +// If one sensor of the metrics has been registered for the client instance, +// then all other sensors should have been registered; and vice versa. +if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != null) { +return; +} + +Map tags = Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, clientInstanceId.toString()); + +Sensor unknownSubscriptionRequestCountSensor = metrics.sensor( +ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST + "-" + clientInstanceId); +unknownSubscriptionRequestCountSensor.add(createMeter(metrics, new WindowedCount(), +ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST, tags)); +sensorsName.add(unknownSubscriptionRequestCountSensor.name()); + +Sensor throttleCount = metrics.sensor(ClientMetricsStats.THROTTLE + "-" + clientInstanceId); +throttleCount.add(createMeter(metrics, new WindowedCount(), ClientMetricsStats.THROTTLE, tags)); +sensorsName.add(throttleCount.name()); + +Sensor pluginExportCount = metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT + "-" + clientInstanceId); +pluginExportCount.add(createMeter(metrics, new WindowedCount(), ClientMetricsStats.PLUGIN_EXPORT, tags)); +sensorsName.add(pluginExportCount.name()); + +Sensor pluginErrorCount = metrics.sensor(ClientMetricsStats.PLUGIN_ERROR + "-" + clientInstanceId); +pluginErrorCount.add(createMeter(metrics, new WindowedCount(), ClientMetricsStats.PLUGIN_ERROR, tags)); +sensorsName.add(pluginErrorCount.name()); + +Sensor pluginExportTime = metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT_TIME + "-" + clientInstanceId); + pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + "Avg", +ClientMetricsStats.GROUP_NAME, "Average time broker spent in invoking plugin exportMetrics call", tags), new Avg()); + pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + "Max", +ClientMetricsStats.GROUP_NAME, "Maximum time broker spent in invoking plugin exportMetrics call", tags), new Max()); +sensorsName.add(pluginExportTime.name()); +} + +public void recordUnknownSubscriptionCount(Uuid clientInstanceId) { +record(UNKNOWN_SUBSCRIPTION_REQUEST, clientInstanceId); +} + +public void recordThrottleCount(Uuid clientInstanceId) { +record(THROTTLE, clientInstanceId); +} + +public void recordPluginExport(Uuid clientInstanceId, long timeMs) { +record(PLUGIN_EXPORT, clientInstanceId); +record(PLUGIN_EXPORT_TIME, clientInstanceId, timeMs); Review Comment: Valid suggestion, thanks for that. I didn't see that. I have made the change
Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #15251: URL: https://github.com/apache/kafka/pull/15251#discussion_r1470279663 ## server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java: ## @@ -299,8 +332,8 @@ public void testGetTelemetrySameClientImmediateRetryAfterPushFail() throws Unkno // Create new client metrics manager which simulates a new server as it will not have any // last request information but request should succeed as subscription id should match // the one with new client instance. - -ClientMetricsManager newClientMetricsManager = new ClientMetricsManager(clientMetricsReceiverPlugin, 100, time); +kafkaMetrics = new Metrics(); Review Comment: Agree, done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #15251: URL: https://github.com/apache/kafka/pull/15251#discussion_r1470278382 ## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ## @@ -288,6 +307,9 @@ private ClientMetricsInstance createClientInstanceAndUpdateCache(Uuid clientInst ClientMetricsInstanceMetadata instanceMetadata) { ClientMetricsInstance clientInstance = createClientInstance(clientInstanceId, instanceMetadata); +// Maybe add client metrics, if metrics not already added. Metrics might be already added +// if the client instance was evicted from the cache because of size limit. Review Comment: The problem is that the eviction on size happens inside common LRUCache class where the code from this manager cannot be instrumented. I do not see any custom override that can be provided to LRUCache class. Having said that, there won't be any hanging additional instance metrics sensors when eviction happens on size as anyways time-based eviction will remove such instance metrics. Also, if the respective instance reports metrics prior time-based eviction, then the metrics numbers for that instance will be correctly reported as well. Hence, I do think we should not evict metrics sensors on size-based eviction. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #15251: URL: https://github.com/apache/kafka/pull/15251#discussion_r1470269365 ## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ## @@ -493,4 +519,123 @@ public void run() { } } } + +// Visible for testing +final class ClientMetricsStats { + +private static final String GROUP_NAME = "ClientMetrics"; + +// Visible for testing +static final String INSTANCE_COUNT = "ClientMetricsInstanceCount"; +static final String UNKNOWN_SUBSCRIPTION_REQUEST = "ClientMetricsUnknownSubscriptionRequest"; +static final String THROTTLE = "ClientMetricsThrottle"; +static final String PLUGIN_EXPORT = "ClientMetricsPluginExport"; +static final String PLUGIN_ERROR = "ClientMetricsPluginError"; +static final String PLUGIN_EXPORT_TIME = "ClientMetricsPluginExportTime"; + +// Names of sensors that are registered through client instances. +private final Set sensorsName = ConcurrentHashMap.newKeySet(); +// List of metric names which are not specific to a client instance. Do not require thread +// safe structure as it will be populated only in constructor. +private final List registeredMetricNames = new ArrayList<>(); + +private final Set instanceMetrics = Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST, +THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, PLUGIN_EXPORT_TIME).collect(Collectors.toSet()); + +ClientMetricsStats() { +Measurable instanceCount = (config, now) -> clientInstanceCache.size(); +MetricName instanceCountMetric = metrics.metricName(INSTANCE_COUNT, GROUP_NAME, +"The current number of client metric instances being managed by the broker"); +metrics.addMetric(instanceCountMetric, instanceCount); +registeredMetricNames.add(instanceCountMetric); +} + +public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) { +// If one sensor of the metrics has been registered for the client instance, +// then all other sensors should have been registered; and vice versa. +if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != null) { +return; +} + +Map tags = Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, clientInstanceId.toString()); + +Sensor unknownSubscriptionRequestCountSensor = metrics.sensor( Review Comment: I think you are right, I have removed the tag and moved the metric as global along instance count metric. Resolving the comment and would update the description of PR with change from the KIP. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
junrao commented on code in PR #15251: URL: https://github.com/apache/kafka/pull/15251#discussion_r1470043472 ## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ## @@ -288,6 +307,9 @@ private ClientMetricsInstance createClientInstanceAndUpdateCache(Uuid clientInst ClientMetricsInstanceMetadata instanceMetadata) { ClientMetricsInstance clientInstance = createClientInstance(clientInstanceId, instanceMetadata); +// Maybe add client metrics, if metrics not already added. Metrics might be already added +// if the client instance was evicted from the cache because of size limit. Review Comment: Hmm, if we evict a client instance from LRU cache, should we remove the corresponding metrics? ## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ## @@ -493,4 +519,123 @@ public void run() { } } } + +// Visible for testing +final class ClientMetricsStats { + +private static final String GROUP_NAME = "ClientMetrics"; + +// Visible for testing +static final String INSTANCE_COUNT = "ClientMetricsInstanceCount"; +static final String UNKNOWN_SUBSCRIPTION_REQUEST = "ClientMetricsUnknownSubscriptionRequest"; +static final String THROTTLE = "ClientMetricsThrottle"; +static final String PLUGIN_EXPORT = "ClientMetricsPluginExport"; +static final String PLUGIN_ERROR = "ClientMetricsPluginError"; +static final String PLUGIN_EXPORT_TIME = "ClientMetricsPluginExportTime"; + +// Names of sensors that are registered through client instances. +private final Set sensorsName = ConcurrentHashMap.newKeySet(); +// List of metric names which are not specific to a client instance. Do not require thread +// safe structure as it will be populated only in constructor. +private final List registeredMetricNames = new ArrayList<>(); + +private final Set instanceMetrics = Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST, +THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, PLUGIN_EXPORT_TIME).collect(Collectors.toSet()); + +ClientMetricsStats() { +Measurable instanceCount = (config, now) -> clientInstanceCache.size(); +MetricName instanceCountMetric = metrics.metricName(INSTANCE_COUNT, GROUP_NAME, +"The current number of client metric instances being managed by the broker"); +metrics.addMetric(instanceCountMetric, instanceCount); +registeredMetricNames.add(instanceCountMetric); +} + +public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) { +// If one sensor of the metrics has been registered for the client instance, +// then all other sensors should have been registered; and vice versa. +if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != null) { +return; +} + +Map tags = Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, clientInstanceId.toString()); + +Sensor unknownSubscriptionRequestCountSensor = metrics.sensor( +ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST + "-" + clientInstanceId); +unknownSubscriptionRequestCountSensor.add(createMeter(metrics, new WindowedCount(), +ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST, tags)); +sensorsName.add(unknownSubscriptionRequestCountSensor.name()); + +Sensor throttleCount = metrics.sensor(ClientMetricsStats.THROTTLE + "-" + clientInstanceId); +throttleCount.add(createMeter(metrics, new WindowedCount(), ClientMetricsStats.THROTTLE, tags)); +sensorsName.add(throttleCount.name()); + +Sensor pluginExportCount = metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT + "-" + clientInstanceId); +pluginExportCount.add(createMeter(metrics, new WindowedCount(), ClientMetricsStats.PLUGIN_EXPORT, tags)); +sensorsName.add(pluginExportCount.name()); + +Sensor pluginErrorCount = metrics.sensor(ClientMetricsStats.PLUGIN_ERROR + "-" + clientInstanceId); +pluginErrorCount.add(createMeter(metrics, new WindowedCount(), ClientMetricsStats.PLUGIN_ERROR, tags)); +sensorsName.add(pluginErrorCount.name()); + +Sensor pluginExportTime = metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT_TIME + "-" + clientInstanceId); + pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + "Avg", +ClientMetricsStats.GROUP_NAME, "Average time broker spent in invoking plugin exportMetrics call", tags), new Avg()); + pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + "Max", +ClientMetricsStats.GROUP_NAME, "Maximum time broker spent in invoking plugin exportMetrics call", tags), new
Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
AndrewJSchofield commented on code in PR #15251: URL: https://github.com/apache/kafka/pull/15251#discussion_r146343 ## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ## @@ -493,4 +519,123 @@ public void run() { } } } + +// Visible for testing +final class ClientMetricsStats { + +private static final String GROUP_NAME = "ClientMetrics"; + +// Visible for testing +static final String INSTANCE_COUNT = "ClientMetricsInstanceCount"; +static final String UNKNOWN_SUBSCRIPTION_REQUEST = "ClientMetricsUnknownSubscriptionRequest"; +static final String THROTTLE = "ClientMetricsThrottle"; +static final String PLUGIN_EXPORT = "ClientMetricsPluginExport"; +static final String PLUGIN_ERROR = "ClientMetricsPluginError"; +static final String PLUGIN_EXPORT_TIME = "ClientMetricsPluginExportTime"; + +// Names of sensors that are registered through client instances. +private final Set sensorsName = ConcurrentHashMap.newKeySet(); +// List of metric names which are not specific to a client instance. Do not require thread +// safe structure as it will be populated only in constructor. +private final List registeredMetricNames = new ArrayList<>(); + +private final Set instanceMetrics = Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST, +THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, PLUGIN_EXPORT_TIME).collect(Collectors.toSet()); + +ClientMetricsStats() { +Measurable instanceCount = (config, now) -> clientInstanceCache.size(); +MetricName instanceCountMetric = metrics.metricName(INSTANCE_COUNT, GROUP_NAME, +"The current number of client metric instances being managed by the broker"); +metrics.addMetric(instanceCountMetric, instanceCount); +registeredMetricNames.add(instanceCountMetric); +} + +public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) { +// If one sensor of the metrics has been registered for the client instance, +// then all other sensors should have been registered; and vice versa. +if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != null) { +return; +} + +Map tags = Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, clientInstanceId.toString()); + +Sensor unknownSubscriptionRequestCountSensor = metrics.sensor( Review Comment: I don't think it's a good idea to tag with the client instance id. Imagine a situation in which an unbounded number of client instance IDs are erroneously being used. I don't think we want to build up knowledge of each of them individually, rather to count how many delinquent instances there are. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #15251: URL: https://github.com/apache/kafka/pull/15251#discussion_r1469854621 ## server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java: ## @@ -299,8 +332,8 @@ public void testGetTelemetrySameClientImmediateRetryAfterPushFail() throws Unkno // Create new client metrics manager which simulates a new server as it will not have any // last request information but request should succeed as subscription id should match // the one with new client instance. - -ClientMetricsManager newClientMetricsManager = new ClientMetricsManager(clientMetricsReceiverPlugin, 100, time); +kafkaMetrics = new Metrics(); Review Comment: The reason of construction is that I do not want to associate earlier instance of kafkaMetrics to new client manager instance else the metrics will collide. Hence created the new one when another instance of client manger is created. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #15251: URL: https://github.com/apache/kafka/pull/15251#discussion_r1469851552 ## server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java: ## @@ -1023,5 +1177,18 @@ public void testCacheExpirationTaskCancelledOnInstanceUpdate() throws UnknownHos assertNotNull(instance); assertNotNull(instance.expirationTimerTask()); assertEquals(1, clientMetricsManager.expirationTimer().size()); +// Metrics size should remain same on instance update. +assertEquals(12, kafkaMetrics.metrics().size()); +assertEquals((double) 1, getMetric(ClientMetricsManager.ClientMetricsStats.INSTANCE_COUNT).metricValue()); +} + +private KafkaMetric getMetric(String name) throws Exception { +Optional> metric = kafkaMetrics.metrics().entrySet().stream() Review Comment: The map has `MetricName` as the key, rather constructing `MetricName` again in tests with tags, I iterated on the map itself. Similar to how we do SelectorTest.java. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #15251: URL: https://github.com/apache/kafka/pull/15251#discussion_r1469849574 ## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ## @@ -493,4 +519,123 @@ public void run() { } } } + +// Visible for testing +final class ClientMetricsStats { + +private static final String GROUP_NAME = "ClientMetrics"; + +// Visible for testing +static final String INSTANCE_COUNT = "ClientMetricsInstanceCount"; +static final String UNKNOWN_SUBSCRIPTION_REQUEST = "ClientMetricsUnknownSubscriptionRequest"; +static final String THROTTLE = "ClientMetricsThrottle"; +static final String PLUGIN_EXPORT = "ClientMetricsPluginExport"; +static final String PLUGIN_ERROR = "ClientMetricsPluginError"; +static final String PLUGIN_EXPORT_TIME = "ClientMetricsPluginExportTime"; + +// Names of sensors that are registered through client instances. +private final Set sensorsName = ConcurrentHashMap.newKeySet(); +// List of metric names which are not specific to a client instance. Do not require thread +// safe structure as it will be populated only in constructor. +private final List registeredMetricNames = new ArrayList<>(); + +private final Set instanceMetrics = Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST, +THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, PLUGIN_EXPORT_TIME).collect(Collectors.toSet()); + +ClientMetricsStats() { +Measurable instanceCount = (config, now) -> clientInstanceCache.size(); +MetricName instanceCountMetric = metrics.metricName(INSTANCE_COUNT, GROUP_NAME, +"The current number of client metric instances being managed by the broker"); +metrics.addMetric(instanceCountMetric, instanceCount); +registeredMetricNames.add(instanceCountMetric); +} + +public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) { +// If one sensor of the metrics has been registered for the client instance, +// then all other sensors should have been registered; and vice versa. +if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != null) { +return; +} + +Map tags = Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, clientInstanceId.toString()); + +Sensor unknownSubscriptionRequestCountSensor = metrics.sensor( +ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST + "-" + clientInstanceId); +unknownSubscriptionRequestCountSensor.add(createMeter(metrics, new WindowedCount(), +ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST, tags)); +sensorsName.add(unknownSubscriptionRequestCountSensor.name()); + +Sensor throttleCount = metrics.sensor(ClientMetricsStats.THROTTLE + "-" + clientInstanceId); +throttleCount.add(createMeter(metrics, new WindowedCount(), ClientMetricsStats.THROTTLE, tags)); +sensorsName.add(throttleCount.name()); + +Sensor pluginExportCount = metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT + "-" + clientInstanceId); +pluginExportCount.add(createMeter(metrics, new WindowedCount(), ClientMetricsStats.PLUGIN_EXPORT, tags)); +sensorsName.add(pluginExportCount.name()); + +Sensor pluginErrorCount = metrics.sensor(ClientMetricsStats.PLUGIN_ERROR + "-" + clientInstanceId); +pluginErrorCount.add(createMeter(metrics, new WindowedCount(), ClientMetricsStats.PLUGIN_ERROR, tags)); Review Comment: I don't think so that we should have a tag with error reason rather that we should log it. I have added the log line in the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #15251: URL: https://github.com/apache/kafka/pull/15251#discussion_r1469845294 ## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ## @@ -288,6 +307,9 @@ private ClientMetricsInstance createClientInstanceAndUpdateCache(Uuid clientInst ClientMetricsInstanceMetadata instanceMetadata) { ClientMetricsInstance clientInstance = createClientInstance(clientInstanceId, instanceMetadata); +// Maybe add client metrics, if metrics not already added. Metrics might be already added +// if the client instance was evicted from the cache because of size limit. Review Comment: The only place where we do not do that is when LRU cache eviction happens because of size which practically shouldn't happen but have added a safe check so we do not get duplicate metric registration error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #15251: URL: https://github.com/apache/kafka/pull/15251#discussion_r1469843003 ## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ## @@ -493,4 +519,123 @@ public void run() { } } } + +// Visible for testing +final class ClientMetricsStats { + +private static final String GROUP_NAME = "ClientMetrics"; + +// Visible for testing +static final String INSTANCE_COUNT = "ClientMetricsInstanceCount"; +static final String UNKNOWN_SUBSCRIPTION_REQUEST = "ClientMetricsUnknownSubscriptionRequest"; +static final String THROTTLE = "ClientMetricsThrottle"; +static final String PLUGIN_EXPORT = "ClientMetricsPluginExport"; +static final String PLUGIN_ERROR = "ClientMetricsPluginError"; +static final String PLUGIN_EXPORT_TIME = "ClientMetricsPluginExportTime"; + +// Names of sensors that are registered through client instances. +private final Set sensorsName = ConcurrentHashMap.newKeySet(); +// List of metric names which are not specific to a client instance. Do not require thread +// safe structure as it will be populated only in constructor. +private final List registeredMetricNames = new ArrayList<>(); + +private final Set instanceMetrics = Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST, +THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, PLUGIN_EXPORT_TIME).collect(Collectors.toSet()); + +ClientMetricsStats() { +Measurable instanceCount = (config, now) -> clientInstanceCache.size(); +MetricName instanceCountMetric = metrics.metricName(INSTANCE_COUNT, GROUP_NAME, +"The current number of client metric instances being managed by the broker"); +metrics.addMetric(instanceCountMetric, instanceCount); +registeredMetricNames.add(instanceCountMetric); +} + +public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) { +// If one sensor of the metrics has been registered for the client instance, +// then all other sensors should have been registered; and vice versa. +if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != null) { +return; +} + +Map tags = Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, clientInstanceId.toString()); + +Sensor unknownSubscriptionRequestCountSensor = metrics.sensor( +ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST + "-" + clientInstanceId); Review Comment: Yes, this needs to be corrected in the KIP. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #15251: URL: https://github.com/apache/kafka/pull/15251#discussion_r1469836415 ## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ## @@ -493,4 +519,123 @@ public void run() { } } } + +// Visible for testing +final class ClientMetricsStats { + +private static final String GROUP_NAME = "ClientMetrics"; + +// Visible for testing +static final String INSTANCE_COUNT = "ClientMetricsInstanceCount"; +static final String UNKNOWN_SUBSCRIPTION_REQUEST = "ClientMetricsUnknownSubscriptionRequest"; +static final String THROTTLE = "ClientMetricsThrottle"; +static final String PLUGIN_EXPORT = "ClientMetricsPluginExport"; +static final String PLUGIN_ERROR = "ClientMetricsPluginError"; +static final String PLUGIN_EXPORT_TIME = "ClientMetricsPluginExportTime"; + +// Names of sensors that are registered through client instances. +private final Set sensorsName = ConcurrentHashMap.newKeySet(); +// List of metric names which are not specific to a client instance. Do not require thread +// safe structure as it will be populated only in constructor. +private final List registeredMetricNames = new ArrayList<>(); + +private final Set instanceMetrics = Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST, +THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, PLUGIN_EXPORT_TIME).collect(Collectors.toSet()); + +ClientMetricsStats() { +Measurable instanceCount = (config, now) -> clientInstanceCache.size(); +MetricName instanceCountMetric = metrics.metricName(INSTANCE_COUNT, GROUP_NAME, +"The current number of client metric instances being managed by the broker"); +metrics.addMetric(instanceCountMetric, instanceCount); +registeredMetricNames.add(instanceCountMetric); +} + +public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) { +// If one sensor of the metrics has been registered for the client instance, +// then all other sensors should have been registered; and vice versa. +if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != null) { +return; +} + +Map tags = Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, clientInstanceId.toString()); + +Sensor unknownSubscriptionRequestCountSensor = metrics.sensor( +ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST + "-" + clientInstanceId); +unknownSubscriptionRequestCountSensor.add(createMeter(metrics, new WindowedCount(), +ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST, tags)); +sensorsName.add(unknownSubscriptionRequestCountSensor.name()); + +Sensor throttleCount = metrics.sensor(ClientMetricsStats.THROTTLE + "-" + clientInstanceId); +throttleCount.add(createMeter(metrics, new WindowedCount(), ClientMetricsStats.THROTTLE, tags)); +sensorsName.add(throttleCount.name()); + +Sensor pluginExportCount = metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT + "-" + clientInstanceId); +pluginExportCount.add(createMeter(metrics, new WindowedCount(), ClientMetricsStats.PLUGIN_EXPORT, tags)); +sensorsName.add(pluginExportCount.name()); + +Sensor pluginErrorCount = metrics.sensor(ClientMetricsStats.PLUGIN_ERROR + "-" + clientInstanceId); +pluginErrorCount.add(createMeter(metrics, new WindowedCount(), ClientMetricsStats.PLUGIN_ERROR, tags)); +sensorsName.add(pluginErrorCount.name()); + +Sensor pluginExportTime = metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT_TIME + "-" + clientInstanceId); + pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + "Avg", +ClientMetricsStats.GROUP_NAME, "Average time broker spent in invoking plugin exportMetrics call", tags), new Avg()); + pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + "Max", +ClientMetricsStats.GROUP_NAME, "Maximum time broker spent in invoking plugin exportMetrics call", tags), new Max()); +sensorsName.add(pluginExportTime.name()); +} + +public void recordUnknownSubscriptionCount(Uuid clientInstanceId) { +record(UNKNOWN_SUBSCRIPTION_REQUEST, clientInstanceId); +} + +public void recordThrottleCount(Uuid clientInstanceId) { +record(THROTTLE, clientInstanceId); +} + +public void recordPluginExport(Uuid clientInstanceId, long timeMs) { +record(PLUGIN_EXPORT, clientInstanceId); +record(PLUGIN_EXPORT_TIME, clientInstanceId, timeMs); +} + +public void recordPluginErrorCount(Uuid clientInstanceId) { +
Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #15251: URL: https://github.com/apache/kafka/pull/15251#discussion_r1469835995 ## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ## @@ -493,4 +519,123 @@ public void run() { } } } + +// Visible for testing +final class ClientMetricsStats { + +private static final String GROUP_NAME = "ClientMetrics"; + +// Visible for testing +static final String INSTANCE_COUNT = "ClientMetricsInstanceCount"; +static final String UNKNOWN_SUBSCRIPTION_REQUEST = "ClientMetricsUnknownSubscriptionRequest"; +static final String THROTTLE = "ClientMetricsThrottle"; +static final String PLUGIN_EXPORT = "ClientMetricsPluginExport"; +static final String PLUGIN_ERROR = "ClientMetricsPluginError"; +static final String PLUGIN_EXPORT_TIME = "ClientMetricsPluginExportTime"; + +// Names of sensors that are registered through client instances. +private final Set sensorsName = ConcurrentHashMap.newKeySet(); +// List of metric names which are not specific to a client instance. Do not require thread +// safe structure as it will be populated only in constructor. +private final List registeredMetricNames = new ArrayList<>(); + +private final Set instanceMetrics = Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST, +THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, PLUGIN_EXPORT_TIME).collect(Collectors.toSet()); + +ClientMetricsStats() { +Measurable instanceCount = (config, now) -> clientInstanceCache.size(); +MetricName instanceCountMetric = metrics.metricName(INSTANCE_COUNT, GROUP_NAME, +"The current number of client metric instances being managed by the broker"); Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #15251: URL: https://github.com/apache/kafka/pull/15251#discussion_r1469798809 ## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ## @@ -493,4 +519,123 @@ public void run() { } } } + +// Visible for testing +final class ClientMetricsStats { + +private static final String GROUP_NAME = "ClientMetrics"; + +// Visible for testing +static final String INSTANCE_COUNT = "ClientMetricsInstanceCount"; +static final String UNKNOWN_SUBSCRIPTION_REQUEST = "ClientMetricsUnknownSubscriptionRequest"; +static final String THROTTLE = "ClientMetricsThrottle"; +static final String PLUGIN_EXPORT = "ClientMetricsPluginExport"; +static final String PLUGIN_ERROR = "ClientMetricsPluginError"; +static final String PLUGIN_EXPORT_TIME = "ClientMetricsPluginExportTime"; + +// Names of sensors that are registered through client instances. +private final Set sensorsName = ConcurrentHashMap.newKeySet(); +// List of metric names which are not specific to a client instance. Do not require thread +// safe structure as it will be populated only in constructor. +private final List registeredMetricNames = new ArrayList<>(); + +private final Set instanceMetrics = Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST, +THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, PLUGIN_EXPORT_TIME).collect(Collectors.toSet()); + +ClientMetricsStats() { +Measurable instanceCount = (config, now) -> clientInstanceCache.size(); +MetricName instanceCountMetric = metrics.metricName(INSTANCE_COUNT, GROUP_NAME, +"The current number of client metric instances being managed by the broker"); +metrics.addMetric(instanceCountMetric, instanceCount); +registeredMetricNames.add(instanceCountMetric); +} + +public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) { +// If one sensor of the metrics has been registered for the client instance, +// then all other sensors should have been registered; and vice versa. +if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != null) { +return; +} + +Map tags = Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, clientInstanceId.toString()); + +Sensor unknownSubscriptionRequestCountSensor = metrics.sensor( +ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST + "-" + clientInstanceId); +unknownSubscriptionRequestCountSensor.add(createMeter(metrics, new WindowedCount(), +ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST, tags)); +sensorsName.add(unknownSubscriptionRequestCountSensor.name()); + +Sensor throttleCount = metrics.sensor(ClientMetricsStats.THROTTLE + "-" + clientInstanceId); +throttleCount.add(createMeter(metrics, new WindowedCount(), ClientMetricsStats.THROTTLE, tags)); +sensorsName.add(throttleCount.name()); + +Sensor pluginExportCount = metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT + "-" + clientInstanceId); +pluginExportCount.add(createMeter(metrics, new WindowedCount(), ClientMetricsStats.PLUGIN_EXPORT, tags)); +sensorsName.add(pluginExportCount.name()); + +Sensor pluginErrorCount = metrics.sensor(ClientMetricsStats.PLUGIN_ERROR + "-" + clientInstanceId); +pluginErrorCount.add(createMeter(metrics, new WindowedCount(), ClientMetricsStats.PLUGIN_ERROR, tags)); +sensorsName.add(pluginErrorCount.name()); + +Sensor pluginExportTime = metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT_TIME + "-" + clientInstanceId); + pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + "Avg", +ClientMetricsStats.GROUP_NAME, "Average time broker spent in invoking plugin exportMetrics call", tags), new Avg()); + pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + "Max", +ClientMetricsStats.GROUP_NAME, "Maximum time broker spent in invoking plugin exportMetrics call", tags), new Max()); +sensorsName.add(pluginExportTime.name()); +} + +public void recordUnknownSubscriptionCount(Uuid clientInstanceId) { +record(UNKNOWN_SUBSCRIPTION_REQUEST, clientInstanceId); +} + +public void recordThrottleCount(Uuid clientInstanceId) { +record(THROTTLE, clientInstanceId); +} + +public void recordPluginExport(Uuid clientInstanceId, long timeMs) { +record(PLUGIN_EXPORT, clientInstanceId); +record(PLUGIN_EXPORT_TIME, clientInstanceId, timeMs); Review Comment: I might be wrong, but we have `record` method on sensor where `PLUGIN_EXPORT
Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #15251: URL: https://github.com/apache/kafka/pull/15251#discussion_r1469784272 ## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ## @@ -493,4 +519,123 @@ public void run() { } } } + +// Visible for testing +final class ClientMetricsStats { + +private static final String GROUP_NAME = "ClientMetrics"; + +// Visible for testing +static final String INSTANCE_COUNT = "ClientMetricsInstanceCount"; +static final String UNKNOWN_SUBSCRIPTION_REQUEST = "ClientMetricsUnknownSubscriptionRequest"; +static final String THROTTLE = "ClientMetricsThrottle"; +static final String PLUGIN_EXPORT = "ClientMetricsPluginExport"; +static final String PLUGIN_ERROR = "ClientMetricsPluginError"; +static final String PLUGIN_EXPORT_TIME = "ClientMetricsPluginExportTime"; + +// Names of sensors that are registered through client instances. +private final Set sensorsName = ConcurrentHashMap.newKeySet(); +// List of metric names which are not specific to a client instance. Do not require thread +// safe structure as it will be populated only in constructor. +private final List registeredMetricNames = new ArrayList<>(); + +private final Set instanceMetrics = Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST, +THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, PLUGIN_EXPORT_TIME).collect(Collectors.toSet()); + +ClientMetricsStats() { +Measurable instanceCount = (config, now) -> clientInstanceCache.size(); +MetricName instanceCountMetric = metrics.metricName(INSTANCE_COUNT, GROUP_NAME, +"The current number of client metric instances being managed by the broker"); +metrics.addMetric(instanceCountMetric, instanceCount); +registeredMetricNames.add(instanceCountMetric); +} + +public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) { +// If one sensor of the metrics has been registered for the client instance, +// then all other sensors should have been registered; and vice versa. +if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != null) { +return; +} + +Map tags = Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, clientInstanceId.toString()); + +Sensor unknownSubscriptionRequestCountSensor = metrics.sensor( Review Comment: If we do want a simple `ClientMetricsUnknownSubscriptionRequestCount` then I can get rid of client instance id and the counter can be simple tracking the number of unknown subscriptions. But if we do want to add a tag with client software version then metrics in JMX will have separate buckets under `ClientMetrics` per client software version with count. My reasoning for tagging with client instance id was that all metrics for an instance falls in same bucket in JMX. Also as we already have client metrics to identify which client software version is associated with which client instance hence we can anyways find the software version, if required. Please let me know what you think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
AndrewJSchofield commented on code in PR #15251: URL: https://github.com/apache/kafka/pull/15251#discussion_r1465945556 ## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ## @@ -493,4 +519,123 @@ public void run() { } } } + +// Visible for testing +final class ClientMetricsStats { + +private static final String GROUP_NAME = "ClientMetrics"; + +// Visible for testing +static final String INSTANCE_COUNT = "ClientMetricsInstanceCount"; +static final String UNKNOWN_SUBSCRIPTION_REQUEST = "ClientMetricsUnknownSubscriptionRequest"; +static final String THROTTLE = "ClientMetricsThrottle"; +static final String PLUGIN_EXPORT = "ClientMetricsPluginExport"; +static final String PLUGIN_ERROR = "ClientMetricsPluginError"; +static final String PLUGIN_EXPORT_TIME = "ClientMetricsPluginExportTime"; + +// Names of sensors that are registered through client instances. +private final Set sensorsName = ConcurrentHashMap.newKeySet(); +// List of metric names which are not specific to a client instance. Do not require thread +// safe structure as it will be populated only in constructor. +private final List registeredMetricNames = new ArrayList<>(); + +private final Set instanceMetrics = Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST, +THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, PLUGIN_EXPORT_TIME).collect(Collectors.toSet()); + +ClientMetricsStats() { +Measurable instanceCount = (config, now) -> clientInstanceCache.size(); +MetricName instanceCountMetric = metrics.metricName(INSTANCE_COUNT, GROUP_NAME, Review Comment: Yes. I anticipate there will be a KIP correction needed here. Once we're happy with the PR, I'll pull together a correction. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
junrao commented on code in PR #15251: URL: https://github.com/apache/kafka/pull/15251#discussion_r1465454670 ## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ## @@ -493,4 +519,123 @@ public void run() { } } } + +// Visible for testing +final class ClientMetricsStats { + +private static final String GROUP_NAME = "ClientMetrics"; + +// Visible for testing +static final String INSTANCE_COUNT = "ClientMetricsInstanceCount"; +static final String UNKNOWN_SUBSCRIPTION_REQUEST = "ClientMetricsUnknownSubscriptionRequest"; +static final String THROTTLE = "ClientMetricsThrottle"; +static final String PLUGIN_EXPORT = "ClientMetricsPluginExport"; +static final String PLUGIN_ERROR = "ClientMetricsPluginError"; +static final String PLUGIN_EXPORT_TIME = "ClientMetricsPluginExportTime"; + +// Names of sensors that are registered through client instances. +private final Set sensorsName = ConcurrentHashMap.newKeySet(); +// List of metric names which are not specific to a client instance. Do not require thread +// safe structure as it will be populated only in constructor. +private final List registeredMetricNames = new ArrayList<>(); + +private final Set instanceMetrics = Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST, +THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, PLUGIN_EXPORT_TIME).collect(Collectors.toSet()); + +ClientMetricsStats() { +Measurable instanceCount = (config, now) -> clientInstanceCache.size(); +MetricName instanceCountMetric = metrics.metricName(INSTANCE_COUNT, GROUP_NAME, +"The current number of client metric instances being managed by the broker"); +metrics.addMetric(instanceCountMetric, instanceCount); +registeredMetricNames.add(instanceCountMetric); +} + +public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) { +// If one sensor of the metrics has been registered for the client instance, +// then all other sensors should have been registered; and vice versa. +if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != null) { +return; +} + +Map tags = Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, clientInstanceId.toString()); + +Sensor unknownSubscriptionRequestCountSensor = metrics.sensor( +ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST + "-" + clientInstanceId); +unknownSubscriptionRequestCountSensor.add(createMeter(metrics, new WindowedCount(), +ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST, tags)); +sensorsName.add(unknownSubscriptionRequestCountSensor.name()); + +Sensor throttleCount = metrics.sensor(ClientMetricsStats.THROTTLE + "-" + clientInstanceId); +throttleCount.add(createMeter(metrics, new WindowedCount(), ClientMetricsStats.THROTTLE, tags)); +sensorsName.add(throttleCount.name()); + +Sensor pluginExportCount = metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT + "-" + clientInstanceId); +pluginExportCount.add(createMeter(metrics, new WindowedCount(), ClientMetricsStats.PLUGIN_EXPORT, tags)); +sensorsName.add(pluginExportCount.name()); + +Sensor pluginErrorCount = metrics.sensor(ClientMetricsStats.PLUGIN_ERROR + "-" + clientInstanceId); +pluginErrorCount.add(createMeter(metrics, new WindowedCount(), ClientMetricsStats.PLUGIN_ERROR, tags)); +sensorsName.add(pluginErrorCount.name()); + +Sensor pluginExportTime = metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT_TIME + "-" + clientInstanceId); + pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + "Avg", +ClientMetricsStats.GROUP_NAME, "Average time broker spent in invoking plugin exportMetrics call", tags), new Avg()); + pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + "Max", +ClientMetricsStats.GROUP_NAME, "Maximum time broker spent in invoking plugin exportMetrics call", tags), new Max()); +sensorsName.add(pluginExportTime.name()); +} + +public void recordUnknownSubscriptionCount(Uuid clientInstanceId) { +record(UNKNOWN_SUBSCRIPTION_REQUEST, clientInstanceId); +} + +public void recordThrottleCount(Uuid clientInstanceId) { +record(THROTTLE, clientInstanceId); +} + +public void recordPluginExport(Uuid clientInstanceId, long timeMs) { +record(PLUGIN_EXPORT, clientInstanceId); +record(PLUGIN_EXPORT_TIME, clientInstanceId, timeMs); +} + +public void recordPluginErrorCount(Uuid clientInstanceId) { +
Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
junrao commented on code in PR #15251: URL: https://github.com/apache/kafka/pull/15251#discussion_r1465476918 ## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ## @@ -493,4 +519,123 @@ public void run() { } } } + +// Visible for testing +final class ClientMetricsStats { + +private static final String GROUP_NAME = "ClientMetrics"; + +// Visible for testing +static final String INSTANCE_COUNT = "ClientMetricsInstanceCount"; +static final String UNKNOWN_SUBSCRIPTION_REQUEST = "ClientMetricsUnknownSubscriptionRequest"; +static final String THROTTLE = "ClientMetricsThrottle"; +static final String PLUGIN_EXPORT = "ClientMetricsPluginExport"; +static final String PLUGIN_ERROR = "ClientMetricsPluginError"; +static final String PLUGIN_EXPORT_TIME = "ClientMetricsPluginExportTime"; + +// Names of sensors that are registered through client instances. +private final Set sensorsName = ConcurrentHashMap.newKeySet(); +// List of metric names which are not specific to a client instance. Do not require thread +// safe structure as it will be populated only in constructor. +private final List registeredMetricNames = new ArrayList<>(); + +private final Set instanceMetrics = Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST, +THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, PLUGIN_EXPORT_TIME).collect(Collectors.toSet()); + +ClientMetricsStats() { +Measurable instanceCount = (config, now) -> clientInstanceCache.size(); +MetricName instanceCountMetric = metrics.metricName(INSTANCE_COUNT, GROUP_NAME, Review Comment: The KIP includes a tag for software version. Should we correct the KIP? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
junrao commented on code in PR #15251: URL: https://github.com/apache/kafka/pull/15251#discussion_r1465492172 ## server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java: ## @@ -1023,5 +1177,18 @@ public void testCacheExpirationTaskCancelledOnInstanceUpdate() throws UnknownHos assertNotNull(instance); assertNotNull(instance.expirationTimerTask()); assertEquals(1, clientMetricsManager.expirationTimer().size()); +// Metrics size should remain same on instance update. +assertEquals(12, kafkaMetrics.metrics().size()); +assertEquals((double) 1, getMetric(ClientMetricsManager.ClientMetricsStats.INSTANCE_COUNT).metricValue()); +} + +private KafkaMetric getMetric(String name) throws Exception { +Optional> metric = kafkaMetrics.metrics().entrySet().stream() Review Comment: Hmm, why do we need to iterate the whole map instead of doing a `get` based on name on the map? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
junrao commented on code in PR #15251: URL: https://github.com/apache/kafka/pull/15251#discussion_r1465474347 ## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ## @@ -493,4 +519,123 @@ public void run() { } } } + +// Visible for testing +final class ClientMetricsStats { + +private static final String GROUP_NAME = "ClientMetrics"; + +// Visible for testing +static final String INSTANCE_COUNT = "ClientMetricsInstanceCount"; +static final String UNKNOWN_SUBSCRIPTION_REQUEST = "ClientMetricsUnknownSubscriptionRequest"; +static final String THROTTLE = "ClientMetricsThrottle"; +static final String PLUGIN_EXPORT = "ClientMetricsPluginExport"; +static final String PLUGIN_ERROR = "ClientMetricsPluginError"; +static final String PLUGIN_EXPORT_TIME = "ClientMetricsPluginExportTime"; + +// Names of sensors that are registered through client instances. +private final Set sensorsName = ConcurrentHashMap.newKeySet(); +// List of metric names which are not specific to a client instance. Do not require thread +// safe structure as it will be populated only in constructor. +private final List registeredMetricNames = new ArrayList<>(); + +private final Set instanceMetrics = Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST, +THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, PLUGIN_EXPORT_TIME).collect(Collectors.toSet()); + +ClientMetricsStats() { +Measurable instanceCount = (config, now) -> clientInstanceCache.size(); +MetricName instanceCountMetric = metrics.metricName(INSTANCE_COUNT, GROUP_NAME, +"The current number of client metric instances being managed by the broker"); +metrics.addMetric(instanceCountMetric, instanceCount); +registeredMetricNames.add(instanceCountMetric); +} + +public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) { +// If one sensor of the metrics has been registered for the client instance, +// then all other sensors should have been registered; and vice versa. +if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != null) { +return; +} + +Map tags = Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, clientInstanceId.toString()); + +Sensor unknownSubscriptionRequestCountSensor = metrics.sensor( Review Comment: The KIP includes a tag with client software version. Is that correct? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
junrao commented on code in PR #15251: URL: https://github.com/apache/kafka/pull/15251#discussion_r1465446342 ## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ## @@ -493,4 +519,123 @@ public void run() { } } } + +// Visible for testing +final class ClientMetricsStats { + +private static final String GROUP_NAME = "ClientMetrics"; + +// Visible for testing +static final String INSTANCE_COUNT = "ClientMetricsInstanceCount"; +static final String UNKNOWN_SUBSCRIPTION_REQUEST = "ClientMetricsUnknownSubscriptionRequest"; +static final String THROTTLE = "ClientMetricsThrottle"; +static final String PLUGIN_EXPORT = "ClientMetricsPluginExport"; +static final String PLUGIN_ERROR = "ClientMetricsPluginError"; +static final String PLUGIN_EXPORT_TIME = "ClientMetricsPluginExportTime"; + +// Names of sensors that are registered through client instances. +private final Set sensorsName = ConcurrentHashMap.newKeySet(); +// List of metric names which are not specific to a client instance. Do not require thread +// safe structure as it will be populated only in constructor. +private final List registeredMetricNames = new ArrayList<>(); + +private final Set instanceMetrics = Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST, +THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, PLUGIN_EXPORT_TIME).collect(Collectors.toSet()); + +ClientMetricsStats() { +Measurable instanceCount = (config, now) -> clientInstanceCache.size(); +MetricName instanceCountMetric = metrics.metricName(INSTANCE_COUNT, GROUP_NAME, +"The current number of client metric instances being managed by the broker"); +metrics.addMetric(instanceCountMetric, instanceCount); +registeredMetricNames.add(instanceCountMetric); +} + +public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) { +// If one sensor of the metrics has been registered for the client instance, +// then all other sensors should have been registered; and vice versa. +if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != null) { +return; +} + +Map tags = Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, clientInstanceId.toString()); + +Sensor unknownSubscriptionRequestCountSensor = metrics.sensor( +ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST + "-" + clientInstanceId); +unknownSubscriptionRequestCountSensor.add(createMeter(metrics, new WindowedCount(), +ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST, tags)); +sensorsName.add(unknownSubscriptionRequestCountSensor.name()); + +Sensor throttleCount = metrics.sensor(ClientMetricsStats.THROTTLE + "-" + clientInstanceId); +throttleCount.add(createMeter(metrics, new WindowedCount(), ClientMetricsStats.THROTTLE, tags)); +sensorsName.add(throttleCount.name()); + +Sensor pluginExportCount = metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT + "-" + clientInstanceId); +pluginExportCount.add(createMeter(metrics, new WindowedCount(), ClientMetricsStats.PLUGIN_EXPORT, tags)); +sensorsName.add(pluginExportCount.name()); + +Sensor pluginErrorCount = metrics.sensor(ClientMetricsStats.PLUGIN_ERROR + "-" + clientInstanceId); +pluginErrorCount.add(createMeter(metrics, new WindowedCount(), ClientMetricsStats.PLUGIN_ERROR, tags)); +sensorsName.add(pluginErrorCount.name()); + +Sensor pluginExportTime = metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT_TIME + "-" + clientInstanceId); + pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + "Avg", +ClientMetricsStats.GROUP_NAME, "Average time broker spent in invoking plugin exportMetrics call", tags), new Avg()); + pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + "Max", +ClientMetricsStats.GROUP_NAME, "Maximum time broker spent in invoking plugin exportMetrics call", tags), new Max()); +sensorsName.add(pluginExportTime.name()); +} + +public void recordUnknownSubscriptionCount(Uuid clientInstanceId) { +record(UNKNOWN_SUBSCRIPTION_REQUEST, clientInstanceId); +} + +public void recordThrottleCount(Uuid clientInstanceId) { +record(THROTTLE, clientInstanceId); +} + +public void recordPluginExport(Uuid clientInstanceId, long timeMs) { +record(PLUGIN_EXPORT, clientInstanceId); +record(PLUGIN_EXPORT_TIME, clientInstanceId, timeMs); Review Comment: Since these two metrics are derived from the same source, they can be on the same
Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
junrao commented on code in PR #15251: URL: https://github.com/apache/kafka/pull/15251#discussion_r1465497737 ## server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java: ## @@ -299,8 +332,8 @@ public void testGetTelemetrySameClientImmediateRetryAfterPushFail() throws Unkno // Create new client metrics manager which simulates a new server as it will not have any // last request information but request should succeed as subscription id should match // the one with new client instance. - -ClientMetricsManager newClientMetricsManager = new ClientMetricsManager(clientMetricsReceiverPlugin, 100, time); +kafkaMetrics = new Metrics(); Review Comment: We create a new instance during setup already. Do we need to construct a new instance here? Ditto in a few other places. ## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ## @@ -191,8 +206,11 @@ public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest re byte[] metrics = request.data().metrics(); if (metrics != null && metrics.length > 0) { try { +long exportTimeStartMs = time.milliseconds(); Review Comment: If we just want to measure the amount of time passed, `time.hiResClockMs` is cheaper. ## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ## @@ -493,4 +519,123 @@ public void run() { } } } + +// Visible for testing +final class ClientMetricsStats { + +private static final String GROUP_NAME = "ClientMetrics"; + +// Visible for testing +static final String INSTANCE_COUNT = "ClientMetricsInstanceCount"; +static final String UNKNOWN_SUBSCRIPTION_REQUEST = "ClientMetricsUnknownSubscriptionRequest"; +static final String THROTTLE = "ClientMetricsThrottle"; +static final String PLUGIN_EXPORT = "ClientMetricsPluginExport"; +static final String PLUGIN_ERROR = "ClientMetricsPluginError"; +static final String PLUGIN_EXPORT_TIME = "ClientMetricsPluginExportTime"; + +// Names of sensors that are registered through client instances. +private final Set sensorsName = ConcurrentHashMap.newKeySet(); +// List of metric names which are not specific to a client instance. Do not require thread +// safe structure as it will be populated only in constructor. +private final List registeredMetricNames = new ArrayList<>(); + +private final Set instanceMetrics = Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST, +THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, PLUGIN_EXPORT_TIME).collect(Collectors.toSet()); + +ClientMetricsStats() { +Measurable instanceCount = (config, now) -> clientInstanceCache.size(); +MetricName instanceCountMetric = metrics.metricName(INSTANCE_COUNT, GROUP_NAME, +"The current number of client metric instances being managed by the broker"); +metrics.addMetric(instanceCountMetric, instanceCount); +registeredMetricNames.add(instanceCountMetric); +} + +public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) { +// If one sensor of the metrics has been registered for the client instance, +// then all other sensors should have been registered; and vice versa. +if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != null) { +return; +} + +Map tags = Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, clientInstanceId.toString()); + +Sensor unknownSubscriptionRequestCountSensor = metrics.sensor( +ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST + "-" + clientInstanceId); Review Comment: > Total number of metrics requests GetTelemetrySubscriptionsRequests with unknown CLIENT_INSTANCE_IDs. The above is the description of the metric in the KIP. It seems inaccurate since it should be for unknown subscriptionId instead of CLIENT_INSTANCE_ID. Also, we track it for PushTelemetryRequest instead of GetTelemetrySubscriptionsRequest. ## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ## @@ -493,4 +519,123 @@ public void run() { } } } + +// Visible for testing +final class ClientMetricsStats { + +private static final String GROUP_NAME = "ClientMetrics"; + +// Visible for testing +static final String INSTANCE_COUNT = "ClientMetricsInstanceCount"; +static final String UNKNOWN_SUBSCRIPTION_REQUEST = "ClientMetricsUnknownSubscriptionRequest"; +static final String THROTTLE = "ClientMetricsThrottle"; +static final String PLUGIN_EXPORT = "ClientMetricsPluginExport"; +static final String PLUGIN_ERROR = "ClientMetricsPluginError";
Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
AndrewJSchofield commented on PR #15251: URL: https://github.com/apache/kafka/pull/15251#issuecomment-1907905085 I see that you have a differences from the KIP. It sounds like you're highlighting situations in which the KIP has small deviations from normal Kafka practice. I would support adjusting the KIP to match. The broker software version tag doesn't seem useful. There is already another metric for that specific information. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
AndrewJSchofield commented on code in PR #15251: URL: https://github.com/apache/kafka/pull/15251#discussion_r1464735557 ## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ## @@ -493,4 +519,123 @@ public void run() { } } } + +// Visible for testing +final class ClientMetricsStats { + +private static final String GROUP_NAME = "ClientMetrics"; + +// Visible for testing +static final String INSTANCE_COUNT = "ClientMetricsInstanceCount"; +static final String UNKNOWN_SUBSCRIPTION_REQUEST = "ClientMetricsUnknownSubscriptionRequest"; +static final String THROTTLE = "ClientMetricsThrottle"; +static final String PLUGIN_EXPORT = "ClientMetricsPluginExport"; +static final String PLUGIN_ERROR = "ClientMetricsPluginError"; +static final String PLUGIN_EXPORT_TIME = "ClientMetricsPluginExportTime"; + +// Names of sensors that are registered through client instances. +private final Set sensorsName = ConcurrentHashMap.newKeySet(); +// List of metric names which are not specific to a client instance. Do not require thread +// safe structure as it will be populated only in constructor. +private final List registeredMetricNames = new ArrayList<>(); + +private final Set instanceMetrics = Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST, +THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, PLUGIN_EXPORT_TIME).collect(Collectors.toSet()); + +ClientMetricsStats() { +Measurable instanceCount = (config, now) -> clientInstanceCache.size(); +MetricName instanceCountMetric = metrics.metricName(INSTANCE_COUNT, GROUP_NAME, +"The current number of client metric instances being managed by the broker"); +metrics.addMetric(instanceCountMetric, instanceCount); +registeredMetricNames.add(instanceCountMetric); +} + +public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) { +// If one sensor of the metrics has been registered for the client instance, +// then all other sensors should have been registered; and vice versa. +if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != null) { +return; +} + +Map tags = Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, clientInstanceId.toString()); + +Sensor unknownSubscriptionRequestCountSensor = metrics.sensor( Review Comment: This one should not include the client instance ID. The KIP is expecting a simple `ClientMetricsUnknownSubscriptionRequestCount` I believe. As such, I don't think it's an instance metric in this case. ## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ## @@ -493,4 +519,123 @@ public void run() { } } } + +// Visible for testing +final class ClientMetricsStats { + +private static final String GROUP_NAME = "ClientMetrics"; + +// Visible for testing +static final String INSTANCE_COUNT = "ClientMetricsInstanceCount"; +static final String UNKNOWN_SUBSCRIPTION_REQUEST = "ClientMetricsUnknownSubscriptionRequest"; +static final String THROTTLE = "ClientMetricsThrottle"; +static final String PLUGIN_EXPORT = "ClientMetricsPluginExport"; +static final String PLUGIN_ERROR = "ClientMetricsPluginError"; +static final String PLUGIN_EXPORT_TIME = "ClientMetricsPluginExportTime"; + +// Names of sensors that are registered through client instances. +private final Set sensorsName = ConcurrentHashMap.newKeySet(); +// List of metric names which are not specific to a client instance. Do not require thread +// safe structure as it will be populated only in constructor. +private final List registeredMetricNames = new ArrayList<>(); + +private final Set instanceMetrics = Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST, +THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, PLUGIN_EXPORT_TIME).collect(Collectors.toSet()); + +ClientMetricsStats() { +Measurable instanceCount = (config, now) -> clientInstanceCache.size(); +MetricName instanceCountMetric = metrics.metricName(INSTANCE_COUNT, GROUP_NAME, +"The current number of client metric instances being managed by the broker"); Review Comment: I think this is a typo in the KIP really, copied to the code, but I would say "client metrics instances". -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
apoorvmittal10 commented on PR #15251: URL: https://github.com/apache/kafka/pull/15251#issuecomment-1907853039 @AndrewJSchofield @junrao Please if you can review and provide the feedback. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
apoorvmittal10 opened a new pull request, #15251: URL: https://github.com/apache/kafka/pull/15251 The KIP-714 defines broker metrics [here](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Metrics) which should help determining the client instances metrics managed by the broker. However, there is certain change as per the metrics defined in KIP and the implementation which have been highlighted below. We can discuss and either amend the KIP or the implementation, though I prefer the former. There are 2 ways 2 emit metrics in broker i.e. using Kafka Metrics or Yammer Metrics. I have used Kafka Metrics as want to have minimal dependency on Yammer (though open for suggestions). **Changes/Query** | Metric Name | Type | Group | Tags -- | -- | -- | -- | -- What should be the name of the tag for broker's version and how can we fetch that as I do not see any placeholder in RequestContext? | ClientMetricsInstanceCount | Gauge | ClientMetrics | version: broker's software version The metric is defined as Meter which according to KafkaMetrics requires another metric called Rate hence 2 metrics are emitted for same as `ClientMetricsPluginErrorCount` and `ClientMetricsPluginErrorRate` | ClientMetricsPluginErrorCount | Meter | ClientMetrics | client_instance_idreason (reason for the failure) The metric is defined as Meter which according to KafkaMetrics requires another metric called Rate hence 2 metrics are emitted for same as `ClientMetricsPluginExportCount` and `ClientMetricsPluginExportRate` | ClientMetricsPluginExportCount | Meter | ClientMetrics | client_instance_id The metric is defined as Meter which according to KafkaMetrics requires another metric called Rate hence 2 metrics are emitted for same as `ClientMetricsThrottleCount` and `ClientMetricsThrottleRate` | ClientMetricsThrottleCount | Meter | ClientMetrics | client_instance_id The metric is defined as Meter which according to KafkaMetrics requires another metric called Rate hence 2 metrics are emitted for same as `ClientMetricsUnknownSubscriptionRequestCount` and `ClientMetricsUnknownSubscriptionRequestRate` | ClientMetricsUnknownSubscriptionRequestCount | Meter | ClientMetrics | client version: client's software version The metric is defined as Histogram but generally we use `Avg` and `Max` as metrics to capture `latency` related metrics (which requires Histogram) hence I have 2 metrics for same as `ClientMetricsPluginExportTimeAvg` and `ClientMetricsPluginExportTimeMax` | ClientMetricsPluginExportTime | Histogram | ClientMetrics | client_instance_id https://github.com/apache/kafka/assets/2861565/32172c3c-b098-49f7-b5d8-5eaabf73fcb1;> ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org