apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391660088


##########
core/src/main/java/kafka/metrics/ClientMetricsReceiverPlugin.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.server.telemetry.ClientTelemetryReceiver;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Plugin to register client telemetry receivers and export metrics. This 
class is used by the Kafka
+ * server to export client metrics to the registered receivers.
+ */
+public class ClientMetricsReceiverPlugin {
+
+    private static final List<ClientTelemetryReceiver> RECEIVERS = new 
ArrayList<>();

Review Comment:
   Done.



##########
core/src/main/java/kafka/server/ClientMetricsManager.java:
##########
@@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable {
     public static ClientMetricsManager instance() {
         return INSTANCE;
     }
+    // Max cache size (16k active client connections per broker)
+    private static final int CM_CACHE_MAX_SIZE = 16384;
+    private final Cache<Uuid, ClientMetricsInstance> clientInstanceCache;
+    private final Map<String, SubscriptionInfo> subscriptionMap;
+
+    // The last subscription updated time is used to determine if the next 
telemetry request needs
+    // to re-evaluate the subscription id as per changes subscriptions.
+    private long lastSubscriptionUpdateEpoch;
+
+    // Visible for testing
+    ClientMetricsManager() {
+        subscriptionMap = new ConcurrentHashMap<>();
+        clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+    }
 
     public void updateSubscription(String subscriptionName, Properties 
properties) {
-        // TODO: Implement the update logic to manage subscriptions.
+        // IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+        // for respective subscription. In that case, we need to remove the 
subscription from the map.
+        if (properties.isEmpty()) {
+            // Remove the subscription from the map if it exists, else ignore 
the config update.
+            if (subscriptionMap.containsKey(subscriptionName)) {
+                log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+                subscriptionMap.remove(subscriptionName);
+                updateLastSubscriptionUpdateEpoch();
+            }
+            return;
+        }
+
+        ClientMetricsConfigs configs = new ClientMetricsConfigs(properties);
+        updateClientSubscription(subscriptionName, configs);
+        /*
+         Update last subscription updated time to current time to indicate 
that there is a change
+         in the subscription. This will be used to determine if the next 
telemetry request needs
+         to re-evaluate the subscription id as per changes subscriptions.
+        */
+        updateLastSubscriptionUpdateEpoch();
+    }
+
+    public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+        GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+        long now = System.currentTimeMillis();
+        Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+            .filter(id -> !id.equals(Uuid.ZERO_UUID))
+            .orElse(generateNewClientId());
+
+        /*
+         Get the client instance from the cache or create a new one. If 
subscription has changed
+         since the last request, then the client instance will be 
re-evaluated. Validation of the
+         request will be done after the client instance is created. If client 
issued get telemetry
+         request prior to push interval, then the client should get a throttle 
error but if the
+         subscription has changed since the last request then the client 
should get the updated
+         subscription immediately.
+        */
+        ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+        try {
+            // Validate the get request parameters for the client instance.
+            validateGetRequest(request, clientInstance);
+        } catch (ApiException exception) {
+            return request.getErrorResponse(throttleMs, exception);
+        } finally {
+            clientInstance.lastGetRequestEpoch(now);
+        }
+
+        clientInstance.lastKnownError(Errors.NONE);
+        return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+    }
+
+    public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+        int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+        Uuid clientInstanceId = request.data().clientInstanceId();
+        if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) {
+            String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+                clientInstanceId);
+            return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));
+        }
+
+        long now = System.currentTimeMillis();
+        ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+        try {
+            // Validate the push request parameters for the client instance.
+            validatePushRequest(request, telemetryMaxBytes, clientInstance);
+        } catch (ApiException exception) {
+            clientInstance.lastKnownError(Errors.forException(exception));
+            return request.getErrorResponse(throttleMs, exception);
+        } finally {
+            // Update the client instance with the latest push request 
parameters.
+            clientInstance.terminating(request.data().terminating());
+            clientInstance.lastPushRequestEpoch(now);
+        }
+
+        // Push the metrics to the external client receiver plugin.
+        byte[] metrics = request.data().metrics();
+        if (metrics != null && metrics.length > 0) {
+            ClientMetricsReceiverPlugin.exportMetrics(requestContext, request);

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to