apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1395204343


##########
core/src/main/java/kafka/server/ClientMetricsManager.java:
##########
@@ -30,17 +68,366 @@ public class ClientMetricsManager implements Closeable {
 
     private static final Logger log = 
LoggerFactory.getLogger(ClientMetricsManager.class);
     private static final ClientMetricsManager INSTANCE = new 
ClientMetricsManager();
+    private static final List<Byte> SUPPORTED_COMPRESSION_TYPES = 
Collections.unmodifiableList(
+        Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, 
CompressionType.GZIP.id, CompressionType.SNAPPY.id));
 
     public static ClientMetricsManager instance() {
         return INSTANCE;
     }
+    // Max cache size (16k active client connections per broker)
+    private static final int CM_CACHE_MAX_SIZE = 16384;
+    private final Cache<Uuid, ClientMetricsInstance> clientInstanceCache;
+    private final Map<String, SubscriptionInfo> subscriptionMap;
+
+    // The last subscription updated time is used to determine if the next 
telemetry request needs
+    // to re-evaluate the subscription id as per changes subscriptions.
+    private volatile long lastSubscriptionUpdateEpoch;
+
+    // Visible for testing
+    ClientMetricsManager() {
+        subscriptionMap = new ConcurrentHashMap<>();
+        clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+    }
 
     public void updateSubscription(String subscriptionName, Properties 
properties) {
-        // TODO: Implement the update logic to manage subscriptions.
+        // IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+        // for respective subscription. In that case, we need to remove the 
subscription from the map.
+        if (properties.isEmpty()) {
+            // Remove the subscription from the map if it exists, else ignore 
the config update.
+            if (subscriptionMap.containsKey(subscriptionName)) {
+                log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+                subscriptionMap.remove(subscriptionName);
+                updateLastSubscriptionUpdateEpoch();
+            }
+            return;
+        }
+
+        ClientMetricsConfigs configs = new ClientMetricsConfigs(properties);
+        updateClientSubscription(subscriptionName, configs);
+        /*
+         Update last subscription updated time to current time to indicate 
that there is a change
+         in the subscription. This will be used to determine if the next 
telemetry request needs
+         to re-evaluate the subscription id as per changes subscriptions.
+        */
+        updateLastSubscriptionUpdateEpoch();
+    }
+
+    public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+        GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+        long now = System.currentTimeMillis();
+        Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+            .filter(id -> !id.equals(Uuid.ZERO_UUID))
+            .orElse(generateNewClientId());
+
+        /*
+         Get the client instance from the cache or create a new one. If 
subscription has changed
+         since the last request, then the client instance will be 
re-evaluated. Validation of the
+         request will be done after the client instance is created. If client 
issued get telemetry
+         request prior to push interval, then the client should get a throttle 
error but if the
+         subscription has changed since the last request then the client 
should get the updated
+         subscription immediately.
+        */
+        ClientMetricsInstance clientInstance = 
clientInstance(clientInstanceId, requestContext, now);
+
+        try {
+            // Validate the get request parameters for the client instance.
+            validateGetRequest(request, clientInstance, now);
+        } catch (ApiException exception) {
+            return request.getErrorResponse(throttleMs, exception);
+        }
+
+        clientInstance.lastKnownError(Errors.NONE);
+        return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+    }
+
+    public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+        int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+        Uuid clientInstanceId = request.data().clientInstanceId();
+        if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) {
+            String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+                clientInstanceId);
+            return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));
+        }
+
+        long now = System.currentTimeMillis();
+        ClientMetricsInstance clientInstance = 
clientInstance(clientInstanceId, requestContext, now);
+
+        try {
+            // Validate the push request parameters for the client instance.
+            validatePushRequest(request, telemetryMaxBytes, clientInstance, 
now);
+        } catch (ApiException exception) {
+            log.debug("Error validating push telemetry request from client 
[{}]", clientInstanceId, exception);
+            clientInstance.lastKnownError(Errors.forException(exception));
+            return request.getErrorResponse(throttleMs, exception);
+        } finally {
+            // Update the client instance with the latest push request 
parameters.
+            clientInstance.terminating(request.data().terminating());
+        }
+
+        // Push the metrics to the external client receiver plugin.
+        byte[] metrics = request.data().metrics();
+        if (metrics != null && metrics.length > 0) {
+            try {
+                
ClientMetricsReceiverPlugin.instance().exportMetrics(requestContext, request);
+            } catch (Exception exception) {
+                clientInstance.lastKnownError(Errors.INVALID_RECORD);
+                return request.errorResponse(throttleMs, 
Errors.INVALID_RECORD);
+            }
+        }
+
+        clientInstance.lastKnownError(Errors.NONE);
+        return new PushTelemetryResponse(new 
PushTelemetryResponseData().setThrottleTimeMs(throttleMs));
     }
 
     @Override
     public void close() throws IOException {
-        // TODO: Implement the close logic to close the client metrics manager.
+        subscriptionMap.clear();
+    }
+
+    private synchronized void updateLastSubscriptionUpdateEpoch() {
+        this.lastSubscriptionUpdateEpoch = System.currentTimeMillis();
+    }
+
+    private void updateClientSubscription(String subscriptionName, 
ClientMetricsConfigs configs) {
+        List<String> metrics = 
configs.getList(ClientMetricsConfigs.SUBSCRIPTION_METRICS);
+        int pushInterval = 
configs.getInt(ClientMetricsConfigs.PUSH_INTERVAL_MS);
+        List<String> clientMatchPattern = 
configs.getList(ClientMetricsConfigs.CLIENT_MATCH_PATTERN);
+
+        SubscriptionInfo newSubscription =
+            new SubscriptionInfo(subscriptionName, metrics, pushInterval,
+                
ClientMetricsConfigs.parseMatchingPatterns(clientMatchPattern));
+
+        subscriptionMap.put(subscriptionName, newSubscription);
+    }
+
+    private Uuid generateNewClientId() {
+        Uuid id = Uuid.randomUuid();
+        while (clientInstanceCache.get(id) != null) {
+            id = Uuid.randomUuid();
+        }
+        return id;
+    }
+
+    private ClientMetricsInstance clientInstance(Uuid clientInstanceId, 
RequestContext requestContext,
+        long timestamp) {
+        ClientMetricsInstance clientInstance = 
clientInstanceCache.get(clientInstanceId);
+
+        if (clientInstance == null) {
+            /*
+             If the client instance is not present in the cache, then create a 
new client instance
+             and update the cache. This can also happen when the telemetry 
request is received by
+             the separate broker instance. Though cache is synchronized, but 
it is possible that
+             concurrent calls can create the same client instance. Hence, 
safeguard the client instance
+             creation with a double-checked lock to ensure that only one 
instance is created.
+            */
+            synchronized (this) {
+                clientInstance = clientInstanceCache.get(clientInstanceId);
+                if (clientInstance != null) {
+                    return clientInstance;
+                }
+
+                ClientMetricsInstanceMetadata instanceMetadata = new 
ClientMetricsInstanceMetadata(
+                    clientInstanceId, requestContext);
+                clientInstance = 
createClientInstanceAndUpdateCache(clientInstanceId, instanceMetadata, 
timestamp);
+            }
+        } else if (clientInstance.subscriptionUpdateEpoch() < 
lastSubscriptionUpdateEpoch) {
+            /*
+             If the last subscription update time for client instance is older 
than the subscription
+             updated time, then re-evaluate the subscription information for 
the client as per the
+             updated subscriptions. This is to ensure that the client instance 
is always in sync with
+             the latest subscription information. Though cache is 
synchronized, but it is possible that
+             concurrent calls can create the same client instance. Hence, 
safeguard the client instance
+             update with a double-checked lock to ensure that only one 
instance is created.
+            */
+            synchronized (this) {
+                clientInstance = clientInstanceCache.get(clientInstanceId);
+                if (clientInstance.subscriptionUpdateEpoch() >= 
lastSubscriptionUpdateEpoch) {
+                    return clientInstance;
+                }
+                clientInstance = 
createClientInstanceAndUpdateCache(clientInstanceId, 
clientInstance.instanceMetadata(), timestamp);
+            }
+        }
+
+        return clientInstance;
+    }
+
+    private ClientMetricsInstance createClientInstanceAndUpdateCache(Uuid 
clientInstanceId,
+        ClientMetricsInstanceMetadata instanceMetadata, long timestamp) {
+
+        ClientMetricsInstance clientInstance = 
createClientInstance(clientInstanceId, instanceMetadata,
+            timestamp);
+        clientInstanceCache.put(clientInstanceId, clientInstance);
+        return clientInstance;
+    }
+
+    private ClientMetricsInstance createClientInstance(Uuid clientInstanceId,
+        ClientMetricsInstanceMetadata instanceMetadata, long timestamp) {
+
+        int pushIntervalMs = ClientMetricsConfigs.DEFAULT_INTERVAL_MS;
+        // Keep a set of metrics to avoid duplicates in case of overlapping 
subscriptions.
+        Set<String> subscribedMetrics = new HashSet<>();
+        boolean allMetricsSubscribed = false;
+
+        for (SubscriptionInfo info : subscriptions()) {
+            if (instanceMetadata.isMatch(info.matchPattern())) {
+                allMetricsSubscribed = allMetricsSubscribed || 
info.metrics().contains(
+                    ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG);
+                subscribedMetrics.addAll(info.metrics());
+                pushIntervalMs = Math.min(pushIntervalMs, info.intervalMs());
+            }
+        }
+
+        /*
+         If client matches with any subscription that has empty metrics 
string, then it means that client
+         is subscribed to all the metrics, so just send the empty string as 
the subscribed metrics.
+        */
+        if (allMetricsSubscribed) {
+            subscribedMetrics.clear();
+            // Add an empty string to indicate that all metrics are subscribed.
+            subscribedMetrics.add(ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS);
+        }
+
+        int subscriptionId = computeSubscriptionId(subscribedMetrics, 
pushIntervalMs, clientInstanceId);
+
+        return new ClientMetricsInstance(clientInstanceId, instanceMetadata, 
subscriptionId, timestamp,
+            subscribedMetrics, pushIntervalMs);
+    }
+
+    /**
+     * Computes the SubscriptionId as a unique identifier for a client 
instance's subscription set,
+     * the id is generated by calculating a CRC32 of the configured metrics 
subscriptions including
+     * the PushIntervalMs, XORed with the ClientInstanceId.
+     */
+    private int computeSubscriptionId(Set<String> metrics, int pushIntervalMs, 
Uuid clientInstanceId) {
+        CRC32 crc = new CRC32();
+        byte[] metricsBytes = (metrics.toString() + 
pushIntervalMs).getBytes(StandardCharsets.UTF_8);
+        crc.update(ByteBuffer.wrap(metricsBytes));
+        return (int) crc.getValue() ^ clientInstanceId.hashCode();
+    }
+
+    private GetTelemetrySubscriptionsResponse 
createGetSubscriptionResponse(Uuid clientInstanceId,
+        ClientMetricsInstance clientInstance, int telemetryMaxBytes, int 
throttleMs) {
+
+        GetTelemetrySubscriptionsResponseData data = new 
GetTelemetrySubscriptionsResponseData()
+            .setClientInstanceId(clientInstanceId)
+            .setSubscriptionId(clientInstance.subscriptionId())
+            .setRequestedMetrics(new ArrayList<>(clientInstance.metrics()))
+            .setAcceptedCompressionTypes(SUPPORTED_COMPRESSION_TYPES)
+            .setPushIntervalMs(clientInstance.pushIntervalMs())
+            .setTelemetryMaxBytes(telemetryMaxBytes)
+            .setDeltaTemporality(true)
+            .setErrorCode(Errors.NONE.code())
+            .setThrottleTimeMs(throttleMs);
+
+        return new GetTelemetrySubscriptionsResponse(data);
+    }
+
+    private void validateGetRequest(GetTelemetrySubscriptionsRequest request,
+        ClientMetricsInstance clientInstance, long timestamp) {
+
+        if (!clientInstance.maybeUpdateGetRequestEpoch(timestamp) && 
(clientInstance.lastKnownError() != Errors.UNKNOWN_SUBSCRIPTION_ID

Review Comment:
   UNKNOWN_SUBSCRIPTION_ID and UNSUPPORTED_COMPRESSION_TYPE are the only 2 
exception types defined in KIP-714 which shall allow immediate retry of 
GetTelemetrySubscriptions request after an error from PushTelemetryRequest. For 
all other exception types either the API calls should be disallowed or should 
follow the regular push interval cycle.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to