apoorvmittal10 commented on code in PR #15251: URL: https://github.com/apache/kafka/pull/15251#discussion_r1469843003
########## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ########## @@ -493,4 +519,123 @@ public void run() { } } } + + // Visible for testing + final class ClientMetricsStats { + + private static final String GROUP_NAME = "ClientMetrics"; + + // Visible for testing + static final String INSTANCE_COUNT = "ClientMetricsInstanceCount"; + static final String UNKNOWN_SUBSCRIPTION_REQUEST = "ClientMetricsUnknownSubscriptionRequest"; + static final String THROTTLE = "ClientMetricsThrottle"; + static final String PLUGIN_EXPORT = "ClientMetricsPluginExport"; + static final String PLUGIN_ERROR = "ClientMetricsPluginError"; + static final String PLUGIN_EXPORT_TIME = "ClientMetricsPluginExportTime"; + + // Names of sensors that are registered through client instances. + private final Set<String> sensorsName = ConcurrentHashMap.newKeySet(); + // List of metric names which are not specific to a client instance. Do not require thread + // safe structure as it will be populated only in constructor. + private final List<MetricName> registeredMetricNames = new ArrayList<>(); + + private final Set<String> instanceMetrics = Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST, + THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, PLUGIN_EXPORT_TIME).collect(Collectors.toSet()); + + ClientMetricsStats() { + Measurable instanceCount = (config, now) -> clientInstanceCache.size(); + MetricName instanceCountMetric = metrics.metricName(INSTANCE_COUNT, GROUP_NAME, + "The current number of client metric instances being managed by the broker"); + metrics.addMetric(instanceCountMetric, instanceCount); + registeredMetricNames.add(instanceCountMetric); + } + + public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) { + // If one sensor of the metrics has been registered for the client instance, + // then all other sensors should have been registered; and vice versa. + if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != null) { + return; + } + + Map<String, String> tags = Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, clientInstanceId.toString()); + + Sensor unknownSubscriptionRequestCountSensor = metrics.sensor( + ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST + "-" + clientInstanceId); Review Comment: Yes, this needs to be corrected in the KIP. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org