junrao commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1470043472
##########
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##########
@@ -288,6 +307,9 @@ private ClientMetricsInstance
createClientInstanceAndUpdateCache(Uuid clientInst
ClientMetricsInstanceMetadata instanceMetadata) {
ClientMetricsInstance clientInstance =
createClientInstance(clientInstanceId, instanceMetadata);
+ // Maybe add client metrics, if metrics not already added. Metrics
might be already added
+ // if the client instance was evicted from the cache because of size
limit.
Review Comment:
Hmm, if we evict a client instance from LRU cache, should we remove the
corresponding metrics?
##########
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##########
@@ -493,4 +519,123 @@ public void run() {
}
}
}
+
+ // Visible for testing
+ final class ClientMetricsStats {
+
+ private static final String GROUP_NAME = "ClientMetrics";
+
+ // Visible for testing
+ static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";
+ static final String UNKNOWN_SUBSCRIPTION_REQUEST =
"ClientMetricsUnknownSubscriptionRequest";
+ static final String THROTTLE = "ClientMetricsThrottle";
+ static final String PLUGIN_EXPORT = "ClientMetricsPluginExport";
+ static final String PLUGIN_ERROR = "ClientMetricsPluginError";
+ static final String PLUGIN_EXPORT_TIME =
"ClientMetricsPluginExportTime";
+
+ // Names of sensors that are registered through client instances.
+ private final Set<String> sensorsName = ConcurrentHashMap.newKeySet();
+ // List of metric names which are not specific to a client instance.
Do not require thread
+ // safe structure as it will be populated only in constructor.
+ private final List<MetricName> registeredMetricNames = new
ArrayList<>();
+
+ private final Set<String> instanceMetrics =
Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST,
+ THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR,
PLUGIN_EXPORT_TIME).collect(Collectors.toSet());
+
+ ClientMetricsStats() {
+ Measurable instanceCount = (config, now) ->
clientInstanceCache.size();
+ MetricName instanceCountMetric =
metrics.metricName(INSTANCE_COUNT, GROUP_NAME,
+ "The current number of client metric instances being managed
by the broker");
+ metrics.addMetric(instanceCountMetric, instanceCount);
+ registeredMetricNames.add(instanceCountMetric);
+ }
+
+ public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) {
+ // If one sensor of the metrics has been registered for the client
instance,
+ // then all other sensors should have been registered; and vice
versa.
+ if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) !=
null) {
+ return;
+ }
+
+ Map<String, String> tags =
Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID,
clientInstanceId.toString());
+
+ Sensor unknownSubscriptionRequestCountSensor = metrics.sensor(
+ ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST + "-" +
clientInstanceId);
+ unknownSubscriptionRequestCountSensor.add(createMeter(metrics, new
WindowedCount(),
+ ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST, tags));
+ sensorsName.add(unknownSubscriptionRequestCountSensor.name());
+
+ Sensor throttleCount = metrics.sensor(ClientMetricsStats.THROTTLE
+ "-" + clientInstanceId);
+ throttleCount.add(createMeter(metrics, new WindowedCount(),
ClientMetricsStats.THROTTLE, tags));
+ sensorsName.add(throttleCount.name());
+
+ Sensor pluginExportCount =
metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT + "-" + clientInstanceId);
+ pluginExportCount.add(createMeter(metrics, new WindowedCount(),
ClientMetricsStats.PLUGIN_EXPORT, tags));
+ sensorsName.add(pluginExportCount.name());
+
+ Sensor pluginErrorCount =
metrics.sensor(ClientMetricsStats.PLUGIN_ERROR + "-" + clientInstanceId);
+ pluginErrorCount.add(createMeter(metrics, new WindowedCount(),
ClientMetricsStats.PLUGIN_ERROR, tags));
+ sensorsName.add(pluginErrorCount.name());
+
+ Sensor pluginExportTime =
metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT_TIME + "-" + clientInstanceId);
+
pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME +
"Avg",
+ ClientMetricsStats.GROUP_NAME, "Average time broker spent in
invoking plugin exportMetrics call", tags), new Avg());
+
pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME +
"Max",
+ ClientMetricsStats.GROUP_NAME, "Maximum time broker spent in
invoking plugin exportMetrics call", tags), new Max());
+ sensorsName.add(pluginExportTime.name());
+ }
+
+ public void recordUnknownSubscriptionCount(Uuid clientInstanceId) {
+ record(UNKNOWN_SUBSCRIPTION_REQUEST, clientInstanceId);
+ }
+
+ public void recordThrottleCount(Uuid clientInstanceId) {
+ record(THROTTLE, clientInstanceId);
+ }
+
+ public void recordPluginExport(Uuid clientInstanceId, long timeMs) {
+ record(PLUGIN_EXPORT, clientInstanceId);
+ record(PLUGIN_EXPORT_TIME, clientInstanceId, timeMs);
Review Comment:
> I might be wrong, but we have record method on sensor where PLUGIN_EXPORT
sensor records a value of 1 on every invocation but PLUGIN_EXPORT_TIME needs to
record time
Hmm, but that's what WindowedCount does. It always records a value of 1
independent of the passing-in value. The passing-in value can then be used to
calculate avg and max.
```
public class WindowedCount extends WindowedSum {
@Override
protected void update(Sample sample, MetricConfig config, double value,
long now) {
super.update(sample, config, 1.0, now);
}
}
```
##########
server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java:
##########
@@ -299,8 +332,8 @@ public void
testGetTelemetrySameClientImmediateRetryAfterPushFail() throws Unkno
// Create new client metrics manager which simulates a new server as
it will not have any
// last request information but request should succeed as subscription
id should match
// the one with new client instance.
-
- ClientMetricsManager newClientMetricsManager = new
ClientMetricsManager(clientMetricsReceiverPlugin, 100, time);
+ kafkaMetrics = new Metrics();
Review Comment:
Then, should we close the new `kafkaMetrics` at the end of the test? Also,
could we make the new `kafkaMetrics` a local val? Otherwise, the old one won't
be closed. Finally, should we close `newClientMetricsManager` at the end of
the test?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]