apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1400048884
########## core/src/main/java/kafka/server/ClientMetricsManager.java: ########## @@ -16,31 +16,421 @@ */ package kafka.server; +import kafka.metrics.ClientMetricsConfigs; +import kafka.metrics.ClientMetricsInstance; +import kafka.metrics.ClientMetricsInstanceMetadata; +import kafka.metrics.ClientMetricsReceiverPlugin; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; +import org.apache.kafka.common.cache.SynchronizedCache; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.TelemetryTooLargeException; +import org.apache.kafka.common.errors.ThrottlingQuotaExceededException; +import org.apache.kafka.common.errors.UnknownSubscriptionIdException; +import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.Crc32C; +import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; /** * Handles client telemetry metrics requests/responses, subscriptions and instance information. */ public class ClientMetricsManager implements Closeable { private static final Logger log = LoggerFactory.getLogger(ClientMetricsManager.class); - private static final ClientMetricsManager INSTANCE = new ClientMetricsManager(); + private static final List<Byte> SUPPORTED_COMPRESSION_TYPES = Collections.unmodifiableList( + Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, CompressionType.GZIP.id, CompressionType.SNAPPY.id)); + // Max cache size (16k active client connections per broker) + private static final int CM_CACHE_MAX_SIZE = 16384; + + private final Cache<Uuid, ClientMetricsInstance> clientInstanceCache; + private final Map<String, SubscriptionInfo> subscriptionMap; + private final KafkaConfig config; + private final Time time; + + // The latest subscription version is used to determine if subscription has changed and needs + // to re-evaluate the client instance subscription id as per changed subscriptions. + private final AtomicInteger subscriptionUpdateVersion; - public static ClientMetricsManager instance() { - return INSTANCE; + public ClientMetricsManager(KafkaConfig config, Time time) { + this.subscriptionMap = new ConcurrentHashMap<>(); + this.subscriptionUpdateVersion = new AtomicInteger(0); + this.clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); + this.config = config; + this.time = time; } public void updateSubscription(String subscriptionName, Properties properties) { - // TODO: Implement the update logic to manage subscriptions. + // Validate the subscription properties. + ClientMetricsConfigs.validate(subscriptionName, properties); + // IncrementalAlterConfigs API will send empty configs when all the configs are deleted + // for respective subscription. In that case, we need to remove the subscription from the map. + if (properties.isEmpty()) { + // Remove the subscription from the map if it exists, else ignore the config update. + if (subscriptionMap.containsKey(subscriptionName)) { + log.info("Removing subscription [{}] from the subscription map", subscriptionName); + subscriptionMap.remove(subscriptionName); + this.subscriptionUpdateVersion.incrementAndGet(); + } + return; + } + + updateClientSubscription(subscriptionName, new ClientMetricsConfigs(properties)); + } + + public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( + GetTelemetrySubscriptionsRequest request, RequestContext requestContext, int throttleMs) { + + long now = time.milliseconds(); + Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) + .filter(id -> !id.equals(Uuid.ZERO_UUID)) + .orElse(generateNewClientId()); + + /* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issues another get + telemetry request prior to push interval, then the client should get a throttle error but if + the subscription has changed since the last request then the client should get the updated + subscription immediately. + */ + ClientMetricsInstance clientInstance = clientInstance(clientInstanceId, requestContext); + + try { + // Validate the get request parameters for the client instance. + validateGetRequest(request, clientInstance, now); + } catch (ApiException exception) { + return request.getErrorResponse(0, exception); + } + + clientInstance.lastKnownError(Errors.NONE); + return createGetSubscriptionResponse(clientInstanceId, clientInstance, throttleMs); + } + + public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, + RequestContext requestContext, int throttleMs) { + + Uuid clientInstanceId = request.data().clientInstanceId(); + if (clientInstanceId == null || Uuid.RESERVED.contains(clientInstanceId)) { + String msg = String.format("Invalid request from the client [%s], invalid client instance id", + clientInstanceId); + return request.getErrorResponse(0, new InvalidRequestException(msg)); + } + + long now = time.milliseconds(); + ClientMetricsInstance clientInstance = clientInstance(clientInstanceId, requestContext); + + try { + // Validate the push request parameters for the client instance. + validatePushRequest(request, clientInstance, now); + } catch (ApiException exception) { + log.debug("Error validating push telemetry request from client [{}]", clientInstanceId, exception); + clientInstance.lastKnownError(Errors.forException(exception)); + return request.getErrorResponse(0, exception); + } finally { + // Update the client instance with the latest push request parameters. + clientInstance.terminating(request.data().terminating()); + } + + // Push the metrics to the external client receiver plugin. + byte[] metrics = request.data().metrics(); + if (metrics != null && metrics.length > 0) { + try { + ClientMetricsReceiverPlugin.instance().exportMetrics(requestContext, request); + } catch (Exception exception) { + clientInstance.lastKnownError(Errors.INVALID_RECORD); + return request.errorResponse(throttleMs, Errors.INVALID_RECORD); + } + } + + clientInstance.lastKnownError(Errors.NONE); + return new PushTelemetryResponse(new PushTelemetryResponseData().setThrottleTimeMs(throttleMs)); Review Comment: Thanks @junrao for explaining this. I have removed throttleMs from PushTelemetry response. Also created below Jira to add changes in QuotaManager. Resolving the comment. https://issues.apache.org/jira/browse/KAFKA-15863 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org