apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1391650483
########## core/src/main/java/kafka/server/ClientMetricsManager.java: ########## @@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable { public static ClientMetricsManager instance() { return INSTANCE; } + // Max cache size (16k active client connections per broker) + private static final int CM_CACHE_MAX_SIZE = 16384; + private final Cache<Uuid, ClientMetricsInstance> clientInstanceCache; + private final Map<String, SubscriptionInfo> subscriptionMap; + + // The last subscription updated time is used to determine if the next telemetry request needs + // to re-evaluate the subscription id as per changes subscriptions. + private long lastSubscriptionUpdateEpoch; + + // Visible for testing + ClientMetricsManager() { + subscriptionMap = new ConcurrentHashMap<>(); + clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); + } public void updateSubscription(String subscriptionName, Properties properties) { - // TODO: Implement the update logic to manage subscriptions. + // IncrementalAlterConfigs API will send empty configs when all the configs are deleted + // for respective subscription. In that case, we need to remove the subscription from the map. + if (properties.isEmpty()) { + // Remove the subscription from the map if it exists, else ignore the config update. + if (subscriptionMap.containsKey(subscriptionName)) { + log.info("Removing subscription [{}] from the subscription map", subscriptionName); + subscriptionMap.remove(subscriptionName); + updateLastSubscriptionUpdateEpoch(); + } + return; + } + + ClientMetricsConfigs configs = new ClientMetricsConfigs(properties); + updateClientSubscription(subscriptionName, configs); + /* + Update last subscription updated time to current time to indicate that there is a change + in the subscription. This will be used to determine if the next telemetry request needs + to re-evaluate the subscription id as per changes subscriptions. + */ + updateLastSubscriptionUpdateEpoch(); + } + + public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( + GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + + long now = System.currentTimeMillis(); + Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) + .filter(id -> !id.equals(Uuid.ZERO_UUID)) + .orElse(generateNewClientId()); + + /* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issued get telemetry + request prior to push interval, then the client should get a throttle error but if the + subscription has changed since the last request then the client should get the updated + subscription immediately. + */ + ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + + try { + // Validate the get request parameters for the client instance. + validateGetRequest(request, clientInstance); + } catch (ApiException exception) { + return request.getErrorResponse(throttleMs, exception); + } finally { + clientInstance.lastGetRequestEpoch(now); + } + + clientInstance.lastKnownError(Errors.NONE); + return createGetSubscriptionResponse(clientInstanceId, clientInstance, telemetryMaxBytes, throttleMs); + } + + public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, + int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + + Uuid clientInstanceId = request.data().clientInstanceId(); + if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) { + String msg = String.format("Invalid request from the client [%s], invalid client instance id", + clientInstanceId); + return request.getErrorResponse(throttleMs, new InvalidRequestException(msg)); Review Comment: I have asked the same question in other comment but shouldn't we set that as API call goes through `sendMaybeThrottle` in KafkaApis which throttles the request? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org