junrao commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1465497737


##########
server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java:
##########
@@ -299,8 +332,8 @@ public void 
testGetTelemetrySameClientImmediateRetryAfterPushFail() throws Unkno
         // Create new client metrics manager which simulates a new server as 
it will not have any
         // last request information but request should succeed as subscription 
id should match
         // the one with new client instance.
-
-        ClientMetricsManager newClientMetricsManager = new 
ClientMetricsManager(clientMetricsReceiverPlugin, 100, time);
+        kafkaMetrics = new Metrics();

Review Comment:
   We create a new instance during setup already. Do we need to construct a new 
instance here? Ditto in a few other places.



##########
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##########
@@ -191,8 +206,11 @@ public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest re
         byte[] metrics = request.data().metrics();
         if (metrics != null && metrics.length > 0) {
             try {
+                long exportTimeStartMs = time.milliseconds();

Review Comment:
   If we just want to measure the amount of time passed, `time.hiResClockMs` is 
cheaper.



##########
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##########
@@ -493,4 +519,123 @@ public void run() {
             }
         }
     }
+
+    // Visible for testing
+    final class ClientMetricsStats {
+
+        private static final String GROUP_NAME = "ClientMetrics";
+
+        // Visible for testing
+        static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";
+        static final String UNKNOWN_SUBSCRIPTION_REQUEST = 
"ClientMetricsUnknownSubscriptionRequest";
+        static final String THROTTLE = "ClientMetricsThrottle";
+        static final String PLUGIN_EXPORT = "ClientMetricsPluginExport";
+        static final String PLUGIN_ERROR = "ClientMetricsPluginError";
+        static final String PLUGIN_EXPORT_TIME = 
"ClientMetricsPluginExportTime";
+
+        // Names of sensors that are registered through client instances.
+        private final Set<String> sensorsName = ConcurrentHashMap.newKeySet();
+        // List of metric names which are not specific to a client instance. 
Do not require thread
+        // safe structure as it will be populated only in constructor.
+        private final List<MetricName> registeredMetricNames = new 
ArrayList<>();
+
+        private final Set<String> instanceMetrics = 
Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST,
+            THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, 
PLUGIN_EXPORT_TIME).collect(Collectors.toSet());
+
+        ClientMetricsStats() {
+            Measurable instanceCount = (config, now) -> 
clientInstanceCache.size();
+            MetricName instanceCountMetric = 
metrics.metricName(INSTANCE_COUNT, GROUP_NAME,
+                "The current number of client metric instances being managed 
by the broker");
+            metrics.addMetric(instanceCountMetric, instanceCount);
+            registeredMetricNames.add(instanceCountMetric);
+        }
+
+        public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) {
+            // If one sensor of the metrics has been registered for the client 
instance,
+            // then all other sensors should have been registered; and vice 
versa.
+            if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != 
null) {
+                return;
+            }
+
+            Map<String, String> tags = 
Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, 
clientInstanceId.toString());
+
+            Sensor unknownSubscriptionRequestCountSensor = metrics.sensor(
+                ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST + "-" + 
clientInstanceId);

Review Comment:
   > Total number of metrics requests GetTelemetrySubscriptionsRequests with 
unknown CLIENT_INSTANCE_IDs.
   
   The above is the description of the metric in the KIP. It seems inaccurate 
since it should be for unknown subscriptionId instead of CLIENT_INSTANCE_ID. 
Also, we track it for PushTelemetryRequest 
    instead of GetTelemetrySubscriptionsRequest.



##########
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##########
@@ -493,4 +519,123 @@ public void run() {
             }
         }
     }
+
+    // Visible for testing
+    final class ClientMetricsStats {
+
+        private static final String GROUP_NAME = "ClientMetrics";
+
+        // Visible for testing
+        static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";
+        static final String UNKNOWN_SUBSCRIPTION_REQUEST = 
"ClientMetricsUnknownSubscriptionRequest";
+        static final String THROTTLE = "ClientMetricsThrottle";
+        static final String PLUGIN_EXPORT = "ClientMetricsPluginExport";
+        static final String PLUGIN_ERROR = "ClientMetricsPluginError";
+        static final String PLUGIN_EXPORT_TIME = 
"ClientMetricsPluginExportTime";
+
+        // Names of sensors that are registered through client instances.
+        private final Set<String> sensorsName = ConcurrentHashMap.newKeySet();
+        // List of metric names which are not specific to a client instance. 
Do not require thread
+        // safe structure as it will be populated only in constructor.
+        private final List<MetricName> registeredMetricNames = new 
ArrayList<>();
+
+        private final Set<String> instanceMetrics = 
Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST,
+            THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, 
PLUGIN_EXPORT_TIME).collect(Collectors.toSet());
+
+        ClientMetricsStats() {
+            Measurable instanceCount = (config, now) -> 
clientInstanceCache.size();
+            MetricName instanceCountMetric = 
metrics.metricName(INSTANCE_COUNT, GROUP_NAME,
+                "The current number of client metric instances being managed 
by the broker");
+            metrics.addMetric(instanceCountMetric, instanceCount);
+            registeredMetricNames.add(instanceCountMetric);
+        }
+
+        public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) {
+            // If one sensor of the metrics has been registered for the client 
instance,
+            // then all other sensors should have been registered; and vice 
versa.
+            if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != 
null) {
+                return;
+            }
+
+            Map<String, String> tags = 
Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, 
clientInstanceId.toString());
+
+            Sensor unknownSubscriptionRequestCountSensor = metrics.sensor(
+                ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST + "-" + 
clientInstanceId);
+            unknownSubscriptionRequestCountSensor.add(createMeter(metrics, new 
WindowedCount(),
+                ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST, tags));
+            sensorsName.add(unknownSubscriptionRequestCountSensor.name());
+
+            Sensor throttleCount = metrics.sensor(ClientMetricsStats.THROTTLE 
+ "-" + clientInstanceId);
+            throttleCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.THROTTLE, tags));
+            sensorsName.add(throttleCount.name());
+
+            Sensor pluginExportCount = 
metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT + "-" + clientInstanceId);
+            pluginExportCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.PLUGIN_EXPORT, tags));
+            sensorsName.add(pluginExportCount.name());
+
+            Sensor pluginErrorCount = 
metrics.sensor(ClientMetricsStats.PLUGIN_ERROR + "-" + clientInstanceId);
+            pluginErrorCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.PLUGIN_ERROR, tags));

Review Comment:
   The KIP also includes a reason tag. Do we still need that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to