junrao commented on code in PR #15251: URL: https://github.com/apache/kafka/pull/15251#discussion_r1465497737
########## server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java: ########## @@ -299,8 +332,8 @@ public void testGetTelemetrySameClientImmediateRetryAfterPushFail() throws Unkno // Create new client metrics manager which simulates a new server as it will not have any // last request information but request should succeed as subscription id should match // the one with new client instance. - - ClientMetricsManager newClientMetricsManager = new ClientMetricsManager(clientMetricsReceiverPlugin, 100, time); + kafkaMetrics = new Metrics(); Review Comment: We create a new instance during setup already. Do we need to construct a new instance here? Ditto in a few other places. ########## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ########## @@ -191,8 +206,11 @@ public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest re byte[] metrics = request.data().metrics(); if (metrics != null && metrics.length > 0) { try { + long exportTimeStartMs = time.milliseconds(); Review Comment: If we just want to measure the amount of time passed, `time.hiResClockMs` is cheaper. ########## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ########## @@ -493,4 +519,123 @@ public void run() { } } } + + // Visible for testing + final class ClientMetricsStats { + + private static final String GROUP_NAME = "ClientMetrics"; + + // Visible for testing + static final String INSTANCE_COUNT = "ClientMetricsInstanceCount"; + static final String UNKNOWN_SUBSCRIPTION_REQUEST = "ClientMetricsUnknownSubscriptionRequest"; + static final String THROTTLE = "ClientMetricsThrottle"; + static final String PLUGIN_EXPORT = "ClientMetricsPluginExport"; + static final String PLUGIN_ERROR = "ClientMetricsPluginError"; + static final String PLUGIN_EXPORT_TIME = "ClientMetricsPluginExportTime"; + + // Names of sensors that are registered through client instances. + private final Set<String> sensorsName = ConcurrentHashMap.newKeySet(); + // List of metric names which are not specific to a client instance. Do not require thread + // safe structure as it will be populated only in constructor. + private final List<MetricName> registeredMetricNames = new ArrayList<>(); + + private final Set<String> instanceMetrics = Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST, + THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, PLUGIN_EXPORT_TIME).collect(Collectors.toSet()); + + ClientMetricsStats() { + Measurable instanceCount = (config, now) -> clientInstanceCache.size(); + MetricName instanceCountMetric = metrics.metricName(INSTANCE_COUNT, GROUP_NAME, + "The current number of client metric instances being managed by the broker"); + metrics.addMetric(instanceCountMetric, instanceCount); + registeredMetricNames.add(instanceCountMetric); + } + + public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) { + // If one sensor of the metrics has been registered for the client instance, + // then all other sensors should have been registered; and vice versa. + if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != null) { + return; + } + + Map<String, String> tags = Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, clientInstanceId.toString()); + + Sensor unknownSubscriptionRequestCountSensor = metrics.sensor( + ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST + "-" + clientInstanceId); Review Comment: > Total number of metrics requests GetTelemetrySubscriptionsRequests with unknown CLIENT_INSTANCE_IDs. The above is the description of the metric in the KIP. It seems inaccurate since it should be for unknown subscriptionId instead of CLIENT_INSTANCE_ID. Also, we track it for PushTelemetryRequest instead of GetTelemetrySubscriptionsRequest. ########## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ########## @@ -493,4 +519,123 @@ public void run() { } } } + + // Visible for testing + final class ClientMetricsStats { + + private static final String GROUP_NAME = "ClientMetrics"; + + // Visible for testing + static final String INSTANCE_COUNT = "ClientMetricsInstanceCount"; + static final String UNKNOWN_SUBSCRIPTION_REQUEST = "ClientMetricsUnknownSubscriptionRequest"; + static final String THROTTLE = "ClientMetricsThrottle"; + static final String PLUGIN_EXPORT = "ClientMetricsPluginExport"; + static final String PLUGIN_ERROR = "ClientMetricsPluginError"; + static final String PLUGIN_EXPORT_TIME = "ClientMetricsPluginExportTime"; + + // Names of sensors that are registered through client instances. + private final Set<String> sensorsName = ConcurrentHashMap.newKeySet(); + // List of metric names which are not specific to a client instance. Do not require thread + // safe structure as it will be populated only in constructor. + private final List<MetricName> registeredMetricNames = new ArrayList<>(); + + private final Set<String> instanceMetrics = Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST, + THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, PLUGIN_EXPORT_TIME).collect(Collectors.toSet()); + + ClientMetricsStats() { + Measurable instanceCount = (config, now) -> clientInstanceCache.size(); + MetricName instanceCountMetric = metrics.metricName(INSTANCE_COUNT, GROUP_NAME, + "The current number of client metric instances being managed by the broker"); + metrics.addMetric(instanceCountMetric, instanceCount); + registeredMetricNames.add(instanceCountMetric); + } + + public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) { + // If one sensor of the metrics has been registered for the client instance, + // then all other sensors should have been registered; and vice versa. + if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != null) { + return; + } + + Map<String, String> tags = Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, clientInstanceId.toString()); + + Sensor unknownSubscriptionRequestCountSensor = metrics.sensor( + ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST + "-" + clientInstanceId); + unknownSubscriptionRequestCountSensor.add(createMeter(metrics, new WindowedCount(), + ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST, tags)); + sensorsName.add(unknownSubscriptionRequestCountSensor.name()); + + Sensor throttleCount = metrics.sensor(ClientMetricsStats.THROTTLE + "-" + clientInstanceId); + throttleCount.add(createMeter(metrics, new WindowedCount(), ClientMetricsStats.THROTTLE, tags)); + sensorsName.add(throttleCount.name()); + + Sensor pluginExportCount = metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT + "-" + clientInstanceId); + pluginExportCount.add(createMeter(metrics, new WindowedCount(), ClientMetricsStats.PLUGIN_EXPORT, tags)); + sensorsName.add(pluginExportCount.name()); + + Sensor pluginErrorCount = metrics.sensor(ClientMetricsStats.PLUGIN_ERROR + "-" + clientInstanceId); + pluginErrorCount.add(createMeter(metrics, new WindowedCount(), ClientMetricsStats.PLUGIN_ERROR, tags)); Review Comment: The KIP also includes a reason tag. Do we still need that? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org