apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1398479915


##########
core/src/main/java/kafka/server/ClientMetricsManager.java:
##########
@@ -30,17 +69,376 @@ public class ClientMetricsManager implements Closeable {
 
     private static final Logger log = 
LoggerFactory.getLogger(ClientMetricsManager.class);
     private static final ClientMetricsManager INSTANCE = new 
ClientMetricsManager();
+    private static final List<Byte> SUPPORTED_COMPRESSION_TYPES = 
Collections.unmodifiableList(
+        Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, 
CompressionType.GZIP.id, CompressionType.SNAPPY.id));
 
     public static ClientMetricsManager instance() {
         return INSTANCE;
     }
+    // Max cache size (16k active client connections per broker)
+    private static final int CM_CACHE_MAX_SIZE = 16384;
+    private final Cache<Uuid, ClientMetricsInstance> clientInstanceCache;
+    private final Map<String, SubscriptionInfo> subscriptionMap;
+    private final Time time;
+
+    // The latest subscription version is used to determine if subscription 
has changed and needs
+    // to re-evaluate the client instance subscription id as per changed 
subscriptions.
+    private final AtomicInteger subscriptionUpdateVersion;
+
+    private ClientMetricsManager() {
+        this(Time.SYSTEM);
+    }
+
+    // Visible for testing
+    ClientMetricsManager(Time time) {
+        this.subscriptionMap = new ConcurrentHashMap<>();
+        this.subscriptionUpdateVersion = new AtomicInteger(0);
+        this.clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+        this.time = time;
+    }
 
     public void updateSubscription(String subscriptionName, Properties 
properties) {
-        // TODO: Implement the update logic to manage subscriptions.
+        // Validate the subscription properties.
+        ClientMetricsConfigs.validate(subscriptionName, properties);
+        // IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+        // for respective subscription. In that case, we need to remove the 
subscription from the map.
+        if (properties.isEmpty()) {
+            // Remove the subscription from the map if it exists, else ignore 
the config update.
+            if (subscriptionMap.containsKey(subscriptionName)) {
+                log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+                subscriptionMap.remove(subscriptionName);
+                this.subscriptionUpdateVersion.incrementAndGet();
+            }
+            return;
+        }
+
+        updateClientSubscription(subscriptionName, new 
ClientMetricsConfigs(properties));
+    }
+
+    public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+        GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+        long now = time.milliseconds();
+        Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+            .filter(id -> !id.equals(Uuid.ZERO_UUID))
+            .orElse(generateNewClientId());
+
+        /*
+         Get the client instance from the cache or create a new one. If 
subscription has changed
+         since the last request, then the client instance will be 
re-evaluated. Validation of the
+         request will be done after the client instance is created. If client 
issues another get
+         telemetry request prior to push interval, then the client should get 
a throttle error but if
+         the subscription has changed since the last request then the client 
should get the updated
+         subscription immediately.
+        */
+        ClientMetricsInstance clientInstance = 
clientInstance(clientInstanceId, requestContext);
+
+        try {
+            // Validate the get request parameters for the client instance.
+            validateGetRequest(request, clientInstance, now);
+        } catch (ApiException exception) {
+            return request.getErrorResponse(throttleMs, exception);
+        }
+
+        clientInstance.lastKnownError(Errors.NONE);
+        return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+    }
+
+    public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+        int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+        Uuid clientInstanceId = request.data().clientInstanceId();
+        if (clientInstanceId == null || 
Uuid.RESERVED.contains(clientInstanceId)) {
+            String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+                clientInstanceId);
+            return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));
+        }
+
+        long now = time.milliseconds();
+        ClientMetricsInstance clientInstance = 
clientInstance(clientInstanceId, requestContext);
+
+        try {
+            // Validate the push request parameters for the client instance.
+            validatePushRequest(request, telemetryMaxBytes, clientInstance, 
now);
+        } catch (ApiException exception) {
+            log.debug("Error validating push telemetry request from client 
[{}]", clientInstanceId, exception);
+            clientInstance.lastKnownError(Errors.forException(exception));
+            return request.getErrorResponse(throttleMs, exception);
+        } finally {
+            // Update the client instance with the latest push request 
parameters.
+            clientInstance.terminating(request.data().terminating());
+        }
+
+        // Push the metrics to the external client receiver plugin.
+        byte[] metrics = request.data().metrics();
+        if (metrics != null && metrics.length > 0) {
+            try {
+                
ClientMetricsReceiverPlugin.instance().exportMetrics(requestContext, request);
+            } catch (Exception exception) {
+                clientInstance.lastKnownError(Errors.INVALID_RECORD);
+                return request.errorResponse(throttleMs, 
Errors.INVALID_RECORD);
+            }
+        }
+
+        clientInstance.lastKnownError(Errors.NONE);
+        return new PushTelemetryResponse(new 
PushTelemetryResponseData().setThrottleTimeMs(throttleMs));
     }
 
     @Override
     public void close() throws IOException {
-        // TODO: Implement the close logic to close the client metrics manager.
+        subscriptionMap.clear();
+    }
+
+    private void updateClientSubscription(String subscriptionName, 
ClientMetricsConfigs configs) {
+        List<String> metrics = 
configs.getList(ClientMetricsConfigs.SUBSCRIPTION_METRICS);
+        int pushInterval = 
configs.getInt(ClientMetricsConfigs.PUSH_INTERVAL_MS);
+        List<String> clientMatchPattern = 
configs.getList(ClientMetricsConfigs.CLIENT_MATCH_PATTERN);
+
+        /*
+         Update last subscription updated time to current time to indicate 
that there is a change
+         in the subscription. This will be used to determine if the next 
telemetry request needs
+         to re-evaluate the subscription id as per changes subscriptions.
+        */
+        int version = this.subscriptionUpdateVersion.incrementAndGet();

Review Comment:
   Corrected it, `this` should be used at places where there is a local 
variable with same name else directly instance variables should be accessed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to