AndrewJSchofield commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1464735557


##########
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##########
@@ -493,4 +519,123 @@ public void run() {
             }
         }
     }
+
+    // Visible for testing
+    final class ClientMetricsStats {
+
+        private static final String GROUP_NAME = "ClientMetrics";
+
+        // Visible for testing
+        static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";
+        static final String UNKNOWN_SUBSCRIPTION_REQUEST = 
"ClientMetricsUnknownSubscriptionRequest";
+        static final String THROTTLE = "ClientMetricsThrottle";
+        static final String PLUGIN_EXPORT = "ClientMetricsPluginExport";
+        static final String PLUGIN_ERROR = "ClientMetricsPluginError";
+        static final String PLUGIN_EXPORT_TIME = 
"ClientMetricsPluginExportTime";
+
+        // Names of sensors that are registered through client instances.
+        private final Set<String> sensorsName = ConcurrentHashMap.newKeySet();
+        // List of metric names which are not specific to a client instance. 
Do not require thread
+        // safe structure as it will be populated only in constructor.
+        private final List<MetricName> registeredMetricNames = new 
ArrayList<>();
+
+        private final Set<String> instanceMetrics = 
Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST,
+            THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, 
PLUGIN_EXPORT_TIME).collect(Collectors.toSet());
+
+        ClientMetricsStats() {
+            Measurable instanceCount = (config, now) -> 
clientInstanceCache.size();
+            MetricName instanceCountMetric = 
metrics.metricName(INSTANCE_COUNT, GROUP_NAME,
+                "The current number of client metric instances being managed 
by the broker");
+            metrics.addMetric(instanceCountMetric, instanceCount);
+            registeredMetricNames.add(instanceCountMetric);
+        }
+
+        public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) {
+            // If one sensor of the metrics has been registered for the client 
instance,
+            // then all other sensors should have been registered; and vice 
versa.
+            if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != 
null) {
+                return;
+            }
+
+            Map<String, String> tags = 
Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, 
clientInstanceId.toString());
+
+            Sensor unknownSubscriptionRequestCountSensor = metrics.sensor(

Review Comment:
   This one should not include the client instance ID. The KIP is expecting a 
simple `ClientMetricsUnknownSubscriptionRequestCount` I believe. As such, I 
don't think it's an instance metric in this case.



##########
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##########
@@ -493,4 +519,123 @@ public void run() {
             }
         }
     }
+
+    // Visible for testing
+    final class ClientMetricsStats {
+
+        private static final String GROUP_NAME = "ClientMetrics";
+
+        // Visible for testing
+        static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";
+        static final String UNKNOWN_SUBSCRIPTION_REQUEST = 
"ClientMetricsUnknownSubscriptionRequest";
+        static final String THROTTLE = "ClientMetricsThrottle";
+        static final String PLUGIN_EXPORT = "ClientMetricsPluginExport";
+        static final String PLUGIN_ERROR = "ClientMetricsPluginError";
+        static final String PLUGIN_EXPORT_TIME = 
"ClientMetricsPluginExportTime";
+
+        // Names of sensors that are registered through client instances.
+        private final Set<String> sensorsName = ConcurrentHashMap.newKeySet();
+        // List of metric names which are not specific to a client instance. 
Do not require thread
+        // safe structure as it will be populated only in constructor.
+        private final List<MetricName> registeredMetricNames = new 
ArrayList<>();
+
+        private final Set<String> instanceMetrics = 
Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST,
+            THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, 
PLUGIN_EXPORT_TIME).collect(Collectors.toSet());
+
+        ClientMetricsStats() {
+            Measurable instanceCount = (config, now) -> 
clientInstanceCache.size();
+            MetricName instanceCountMetric = 
metrics.metricName(INSTANCE_COUNT, GROUP_NAME,
+                "The current number of client metric instances being managed 
by the broker");

Review Comment:
   I think this is a typo in the KIP really, copied to the code, but I would 
say "client metrics instances".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to