junrao commented on code in PR #15251: URL: https://github.com/apache/kafka/pull/15251#discussion_r1465454670
########## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ########## @@ -493,4 +519,123 @@ public void run() { } } } + + // Visible for testing + final class ClientMetricsStats { + + private static final String GROUP_NAME = "ClientMetrics"; + + // Visible for testing + static final String INSTANCE_COUNT = "ClientMetricsInstanceCount"; + static final String UNKNOWN_SUBSCRIPTION_REQUEST = "ClientMetricsUnknownSubscriptionRequest"; + static final String THROTTLE = "ClientMetricsThrottle"; + static final String PLUGIN_EXPORT = "ClientMetricsPluginExport"; + static final String PLUGIN_ERROR = "ClientMetricsPluginError"; + static final String PLUGIN_EXPORT_TIME = "ClientMetricsPluginExportTime"; + + // Names of sensors that are registered through client instances. + private final Set<String> sensorsName = ConcurrentHashMap.newKeySet(); + // List of metric names which are not specific to a client instance. Do not require thread + // safe structure as it will be populated only in constructor. + private final List<MetricName> registeredMetricNames = new ArrayList<>(); + + private final Set<String> instanceMetrics = Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST, + THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, PLUGIN_EXPORT_TIME).collect(Collectors.toSet()); + + ClientMetricsStats() { + Measurable instanceCount = (config, now) -> clientInstanceCache.size(); + MetricName instanceCountMetric = metrics.metricName(INSTANCE_COUNT, GROUP_NAME, + "The current number of client metric instances being managed by the broker"); + metrics.addMetric(instanceCountMetric, instanceCount); + registeredMetricNames.add(instanceCountMetric); + } + + public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) { + // If one sensor of the metrics has been registered for the client instance, + // then all other sensors should have been registered; and vice versa. + if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != null) { + return; + } + + Map<String, String> tags = Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, clientInstanceId.toString()); + + Sensor unknownSubscriptionRequestCountSensor = metrics.sensor( + ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST + "-" + clientInstanceId); + unknownSubscriptionRequestCountSensor.add(createMeter(metrics, new WindowedCount(), + ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST, tags)); + sensorsName.add(unknownSubscriptionRequestCountSensor.name()); + + Sensor throttleCount = metrics.sensor(ClientMetricsStats.THROTTLE + "-" + clientInstanceId); + throttleCount.add(createMeter(metrics, new WindowedCount(), ClientMetricsStats.THROTTLE, tags)); + sensorsName.add(throttleCount.name()); + + Sensor pluginExportCount = metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT + "-" + clientInstanceId); + pluginExportCount.add(createMeter(metrics, new WindowedCount(), ClientMetricsStats.PLUGIN_EXPORT, tags)); + sensorsName.add(pluginExportCount.name()); + + Sensor pluginErrorCount = metrics.sensor(ClientMetricsStats.PLUGIN_ERROR + "-" + clientInstanceId); + pluginErrorCount.add(createMeter(metrics, new WindowedCount(), ClientMetricsStats.PLUGIN_ERROR, tags)); + sensorsName.add(pluginErrorCount.name()); + + Sensor pluginExportTime = metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT_TIME + "-" + clientInstanceId); + pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + "Avg", + ClientMetricsStats.GROUP_NAME, "Average time broker spent in invoking plugin exportMetrics call", tags), new Avg()); + pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + "Max", + ClientMetricsStats.GROUP_NAME, "Maximum time broker spent in invoking plugin exportMetrics call", tags), new Max()); + sensorsName.add(pluginExportTime.name()); + } + + public void recordUnknownSubscriptionCount(Uuid clientInstanceId) { + record(UNKNOWN_SUBSCRIPTION_REQUEST, clientInstanceId); + } + + public void recordThrottleCount(Uuid clientInstanceId) { + record(THROTTLE, clientInstanceId); + } + + public void recordPluginExport(Uuid clientInstanceId, long timeMs) { + record(PLUGIN_EXPORT, clientInstanceId); + record(PLUGIN_EXPORT_TIME, clientInstanceId, timeMs); + } + + public void recordPluginErrorCount(Uuid clientInstanceId) { + record(PLUGIN_ERROR, clientInstanceId); + } + + public void unregisterClientInstanceMetrics(Uuid clientInstanceId) { + for (String name : instanceMetrics) { + String sensorName = name + "-" + clientInstanceId; + metrics.removeSensor(sensorName); + sensorsName.remove(sensorName); + } + } + + public void unregisterMetrics() { + for (MetricName metricName : registeredMetricNames) { + metrics.removeMetric(metricName); + } + for (String name : sensorsName) { + metrics.removeSensor(name); Review Comment: Should we remove it from `sensorsName` too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org