chia7712 commented on code in PR #19807:
URL: https://github.com/apache/kafka/pull/19807#discussion_r2185784855


##########
server/src/main/java/org/apache/kafka/server/ClientQuotaManager.java:
##########
@@ -0,0 +1,908 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;

Review Comment:
   perhaps those new files should be moved to `package 
org.apache.kafka.server.quota`?



##########
server/src/main/java/org/apache/kafka/server/ClientQuotaManager.java:
##########
@@ -0,0 +1,908 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Quota;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.Sanitizer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.network.Session;
+import org.apache.kafka.server.config.ClientQuotaManagerConfig;
+import org.apache.kafka.server.quota.ClientQuotaCallback;
+import org.apache.kafka.server.quota.ClientQuotaEntity;
+import org.apache.kafka.server.quota.ClientQuotaType;
+import org.apache.kafka.server.quota.QuotaType;
+import org.apache.kafka.server.quota.QuotaUtils;
+import org.apache.kafka.server.quota.SensorAccess;
+import org.apache.kafka.server.quota.ThrottleCallback;
+import org.apache.kafka.server.quota.ThrottledChannel;
+import org.apache.kafka.server.util.ShutdownableThread;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
+
+public class ClientQuotaManager {
+
+    public static final int NO_QUOTAS = 0;
+    public static final int CLIENT_ID_QUOTA_ENABLED = 1;
+    public static final int USER_QUOTA_ENABLED = 2;
+    public static final int USER_CLIENT_ID_QUOTA_ENABLED = 4;
+    public static final int CUSTOM_QUOTAS = 8; // No metric update 
optimizations are used with custom quotas
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ClientQuotaManager.class);
+    private final ConcurrentHashMap<Integer, Integer> activeQuotaEntities = 
new ConcurrentHashMap<>();
+
+    // Purge sensors after 1 hour of inactivity
+    private static final int INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS = 3600;
+    private static final String DEFAULT_NAME = "<default>";
+
+    public record UserEntity(String sanitizedUser) implements 
ClientQuotaEntity.ConfigEntity {
+
+        @Override
+        public ClientQuotaEntity.ConfigEntityType entityType() {
+            return ClientQuotaEntity.ConfigEntityType.USER;
+        }
+
+        @Override
+        public String name() {
+            return Sanitizer.desanitize(sanitizedUser);
+        }
+
+        @Override
+        public String toString() {
+            return "user " + sanitizedUser;
+        }
+    }
+    public record ClientIdEntity(String clientId) implements 
ClientQuotaEntity.ConfigEntity {
+
+        @Override
+        public ClientQuotaEntity.ConfigEntityType entityType() {
+            return ClientQuotaEntity.ConfigEntityType.CLIENT_ID;
+        }
+
+        @Override
+        public String name() {
+            return clientId;
+        }
+
+        @Override
+        public String toString() {
+            return "client-id " + clientId;
+        }
+    }
+
+    public static final ClientQuotaEntity.ConfigEntity DEFAULT_USER_ENTITY = 
new ClientQuotaEntity.ConfigEntity() {
+        @Override
+        public ClientQuotaEntity.ConfigEntityType entityType() {
+            return ClientQuotaEntity.ConfigEntityType.DEFAULT_USER;
+        }
+
+        @Override
+        public String name() {
+            return DEFAULT_NAME;
+        }
+
+        @Override
+        public String toString() {
+            return "default user";
+        }
+    };
+
+    public static final ClientQuotaEntity.ConfigEntity DEFAULT_USER_CLIENT_ID 
= new ClientQuotaEntity.ConfigEntity() {
+        @Override
+        public ClientQuotaEntity.ConfigEntityType entityType() {
+            return ClientQuotaEntity.ConfigEntityType.DEFAULT_CLIENT_ID;
+        }
+        @Override
+        public String name() {
+            return DEFAULT_NAME;
+        }
+        @Override
+        public String toString() {
+            return "default client-id";
+        }
+    };
+
+    private static final KafkaQuotaEntity DEFAULT_CLIENT_ID_QUOTA_ENTITY =
+            new KafkaQuotaEntity(null, DEFAULT_USER_CLIENT_ID);
+    private static final KafkaQuotaEntity DEFAULT_USER_QUOTA_ENTITY =
+            new KafkaQuotaEntity(DEFAULT_USER_ENTITY, null);
+    private static final KafkaQuotaEntity DEFAULT_USER_CLIENT_ID_QUOTA_ENTITY =
+            new KafkaQuotaEntity(DEFAULT_USER_ENTITY, DEFAULT_USER_CLIENT_ID);
+
+    public record KafkaQuotaEntity(ClientQuotaEntity.ConfigEntity userEntity,
+                                   ClientQuotaEntity.ConfigEntity 
clientIdEntity) implements ClientQuotaEntity {
+
+        @Override
+        public List<ConfigEntity> configEntities() {
+            List<ClientQuotaEntity.ConfigEntity> entities = new ArrayList<>();
+            if (userEntity != null) {
+                entities.add(userEntity);
+            }
+            if (clientIdEntity != null) {
+                entities.add(clientIdEntity);
+            }
+            return entities;
+        }
+
+        public String sanitizedUser() {
+            if (userEntity instanceof UserEntity userRecord) {
+                return userRecord.sanitizedUser();
+            } else if (userEntity == DEFAULT_USER_ENTITY) {
+                return DEFAULT_NAME;
+            }
+            return "";
+        }
+
+        public String clientId() {
+            return clientIdEntity != null ? clientIdEntity.name() : "";
+        }
+
+        @Override
+        public String toString() {
+            String user = userEntity != null ? userEntity.toString() : "";
+            String clientId = clientIdEntity != null ? 
clientIdEntity.toString() : "";
+            return (user + " " + clientId).trim();
+        }
+    }
+
+    public static final String USER_TAG = "user";
+    public static final String CLIENT_ID_TAG = "client-id";
+
+    private final ClientQuotaManagerConfig config;
+    protected final Metrics metrics;
+    private final QuotaType quotaType;
+    protected final Time time;
+
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    private final SensorAccess sensorAccessor;
+    private final ClientQuotaCallback quotaCallback;
+    private final ClientQuotaType clientQuotaType;
+
+    private volatile int quotaTypesEnabled;
+
+    public int quotaTypesEnabled() {
+        return quotaTypesEnabled;
+    }
+
+    private final Sensor delayQueueSensor;
+    private final DelayQueue<ThrottledChannel> delayQueue = new DelayQueue<>();
+    private final ThrottledChannelReaper throttledChannelReaper;
+
+    public void processThrottledChannelReaperDoWork() {
+        throttledChannelReaper.doWork();
+    }
+
+    /**
+     * Helper class that records per-client metrics. It is also responsible 
for maintaining Quota usage statistics
+     * for all clients.
+     * <p/>
+     * Quotas can be set at <user, client-id>, user or client-id levels. For a 
given client connection,
+     * the most specific quota matching the connection will be applied. For 
example, if both a <user, client-id>
+     * and a user quota match a connection, the <user, client-id> quota will 
be used. Otherwise, user quota takes
+     * precedence over client-id quota. The order of precedence is:
+     * <ul>
+     *   <li>/config/users/<user>/clients/<client-id>
+     *   <li>/config/users/<user>/clients/<default>
+     *   <li>/config/users/<user>
+     *   <li>/config/users/<default>/clients/<client-id>
+     *   <li>/config/users/<default>/clients/<default>
+     *   <li>/config/users/<default>
+     *   <li>/config/clients/<client-id>
+     *   <li>/config/clients/<default>
+     * </ul>
+     * Quota limits including defaults may be updated dynamically. The 
implementation is optimized for the case
+     * where a single level of quotas is configured.
+     * @param config the ClientQuotaManagerConfig containing quota 
configurations
+     * @param metrics the Metrics instance for recording quota-related metrics
+     * @param quotaType the quota type managed by this quota manager
+     * @param time the Time object used for time-based operations
+     * @param threadNamePrefix the thread name prefix used for internal threads
+     * @param clientQuotaCallbackPlugin optional Plugin containing a 
ClientQuotaCallback for custom quota logic
+     */
+    public ClientQuotaManager(ClientQuotaManagerConfig config,
+                              Metrics metrics,
+                              QuotaType quotaType,
+                              Time time,
+                              String threadNamePrefix,
+                              Optional<Plugin<ClientQuotaCallback>> 
clientQuotaCallbackPlugin) {
+        this.config = config;
+        this.metrics = metrics;
+        this.quotaType = quotaType;
+        this.time = time;
+        this.sensorAccessor = new SensorAccess(lock, metrics);
+        this.clientQuotaType = QuotaType.toClientQuotaType(quotaType);
+        this.quotaTypesEnabled = clientQuotaCallbackPlugin.isPresent() ?
+                CUSTOM_QUOTAS : NO_QUOTAS;
+        this.delayQueueSensor = metrics.sensor(quotaType + "-delayQueue");
+        this.delayQueueSensor.add(metrics.metricName("queue-size", 
quotaType.toString(),
+                "Tracks the size of the delay queue"), new CumulativeSum());
+        this.throttledChannelReaper = new ThrottledChannelReaper(delayQueue, 
threadNamePrefix);
+        this.quotaCallback = clientQuotaCallbackPlugin
+                .map(Plugin::get)
+                .orElse(new DefaultQuotaCallback());
+
+        start(); // Extract thread start to separate method to avoid 
SC_START_IN_CTOR warning
+    }
+
+    public ClientQuotaManager(ClientQuotaManagerConfig config,
+                              Metrics metrics,
+                              QuotaType quotaType,
+                              Time time,
+                              String threadNamePrefix) {
+        this(config, metrics, quotaType, time, threadNamePrefix, 
Optional.empty());
+    }
+
+    protected Metrics metrics() {
+        return metrics;
+    }
+    protected Time time() {
+        return time;
+    }
+
+    private void start() {
+        throttledChannelReaper.start();
+    }
+
+    /**
+     * Reaper thread that triggers channel unmute callbacks on all throttled 
channels
+     */
+    public class ThrottledChannelReaper extends ShutdownableThread {
+        private final DelayQueue<ThrottledChannel> delayQueue;
+
+        public ThrottledChannelReaper(DelayQueue<ThrottledChannel> delayQueue, 
String prefix) {
+            super(prefix + "ThrottledChannelReaper-" + quotaType, false);
+            this.delayQueue = delayQueue;
+        }
+
+        @Override
+        public void doWork() {
+            ThrottledChannel throttledChannel;
+            try {
+                throttledChannel = delayQueue.poll(1, TimeUnit.SECONDS);
+                if (throttledChannel != null) {
+                    // Decrement the size of the delay queue
+                    delayQueueSensor.record(-1);
+                    // Notify the socket server that throttling is done for 
this channel
+                    throttledChannel.notifyThrottlingDone();
+                }
+            } catch (InterruptedException e) {
+                // Ignore and continue
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /**
+     * Returns true if any quotas are enabled for this quota manager. This is 
used
+     * to determine if quota-related metrics should be created.
+     */
+    public boolean quotasEnabled() {
+        return quotaTypesEnabled != NO_QUOTAS;
+    }
+
+    /**
+     * See recordAndGetThrottleTimeMs.
+     */
+    public int maybeRecordAndGetThrottleTimeMs(Session session, String 
clientId, double value, long timeMs) {
+        // Record metrics only if quotas are enabled.
+        if (quotasEnabled()) {
+            return recordAndGetThrottleTimeMs(session, clientId, value, 
timeMs);
+        } else {
+            return 0;
+        }
+    }
+
+    /**
+     * Records that a user/clientId accumulated or would like to accumulate 
the provided amount at the
+     * specified time, returns throttle time in milliseconds.
+     *
+     * @param session The session from which the user is extracted
+     * @param clientId The client id
+     * @param value The value to accumulate
+     * @param timeMs The time at which to accumulate the value
+     * @return The throttle time in milliseconds defines as the time to wait 
until the average
+     *         rate gets back to the defined quota
+     */
+    public int recordAndGetThrottleTimeMs(Session session, String clientId, 
double value, long timeMs) {
+        var clientSensors = getOrCreateQuotaSensors(session, clientId);
+        try {
+            clientSensors.quotaSensor().record(value, timeMs, true);
+            return 0;
+        } catch (QuotaViolationException e) {
+            var throttleTimeMs = (int) throttleTime(e, timeMs);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Quota violated for sensor ({}). Delay time: ({})",
+                        clientSensors.quotaSensor().name(), throttleTimeMs);
+            }
+            return throttleTimeMs;
+        }
+    }
+
+    /**
+     * Records that a user/clientId changed some metric being throttled 
without checking for
+     * quota violation. The aggregate value will subsequently be used for 
throttling when the
+     * next request is processed.
+     */
+    public void recordNoThrottle(Session session, String clientId, double 
value) {
+        var clientSensors = getOrCreateQuotaSensors(session, clientId);
+        clientSensors.quotaSensor().record(value, time.milliseconds(), false);
+    }
+
+    /**
+     * "Unrecord" the given value that has already been recorded for the given 
user/client by recording a negative value
+     * of the same quantity.
+     * For a throttled fetch, the broker should return an empty response and 
thus should not record the value. Ideally,
+     * we would like to compute the throttle time before actually recording 
the value, but the current Sensor code
+     * couples value recording and quota checking very tightly. As a 
workaround, we will unrecord the value for the fetch
+     * in case of throttling. Rate keeps the sum of values that fall in each 
time window, so this should bring the
+     * overall sum back to the previous value.
+     */
+    public void unrecordQuotaSensor(Session session, String clientId, double 
value, long timeMs) {
+        var clientSensors = getOrCreateQuotaSensors(session, clientId);
+        clientSensors.quotaSensor().record(value * -1, timeMs, false);
+    }
+
+    /**
+     * Returns the maximum value that could be recorded without guaranteed 
throttling.
+     * Recording any larger value will always be throttled, even if no other 
values were recorded in the quota window.
+     * This is used for deciding the maximum bytes that can be fetched at once
+     */
+    public double maxValueInQuotaWindow(Session session, String clientId) {
+        if (!quotasEnabled()) return Double.MAX_VALUE;
+        var clientSensors = getOrCreateQuotaSensors(session, clientId);
+        var limit = quotaCallback.quotaLimit(clientQuotaType, 
clientSensors.metricTags());
+        if (limit != null) return limit * (config.numQuotaSamples - 1) * 
config.quotaWindowSizeSeconds;
+        return Double.MAX_VALUE;
+    }
+
+    /**
+     * Throttle a client by muting the associated channel for the given 
throttle time.
+     * @param clientId request client id
+     * @param session request session
+     * @param throttleTimeMs Duration in milliseconds for which the channel is 
to be muted.
+     * @param throttleCallback Callback for channel throttling
+     */
+    public void throttle(
+            String clientId,
+            Session session,
+            ThrottleCallback throttleCallback,
+            int throttleTimeMs
+    ) {
+        if (throttleTimeMs > 0) {
+            var clientSensors = getOrCreateQuotaSensors(session, clientId);
+            clientSensors.throttleTimeSensor().record(throttleTimeMs);
+            var throttledChannel = new ThrottledChannel(time, throttleTimeMs, 
throttleCallback);
+            delayQueue.add(throttledChannel);
+            delayQueueSensor.record();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Channel throttled for sensor ({}). Delay time: 
({})",
+                        clientSensors.quotaSensor().name(), throttleTimeMs);
+            }
+        }
+    }
+
+    /**
+     * Returns the quota for the client with the specified (non-encoded) user 
principal and client-id.
+     * Note: this method is expensive, it is meant to be used by tests only
+     */
+    public Quota quota(String user, String clientId) {
+        var userPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user);
+        return quota(userPrincipal, clientId);
+    }
+
+    /**
+     * Returns the quota for the client with the specified user principal and 
client-id.
+     * Note: this method is expensive, it is meant to be used by tests only
+     */
+    public Quota quota(KafkaPrincipal userPrincipal, String clientId) {
+        var metricTags = quotaCallback.quotaMetricTags(clientQuotaType, 
userPrincipal, clientId);
+        return Quota.upperBound(quotaLimit(metricTags));
+    }
+
+    private double quotaLimit(Map<String, String> metricTags) {
+        var limit = quotaCallback.quotaLimit(clientQuotaType, metricTags);
+        return limit != null ? limit : Long.MAX_VALUE;
+    }
+
+    /**
+     * This calculates the amount of time needed to bring the metric within 
quota
+     * assuming that no new metrics are recorded.
+     */
+    protected long throttleTime(QuotaViolationException e, long timeMs) {
+        return QuotaUtils.throttleTime(e, timeMs);
+    }
+
+
+    /**
+     * This function either returns the sensors for a given client id or 
creates them if they don't exist
+     */
+    public ClientSensors getOrCreateQuotaSensors(Session session, String 
clientId) {
+        var metricTags = quotaCallback instanceof DefaultQuotaCallback 
defaultCallback
+                ? defaultCallback.quotaMetricTags(session.sanitizedUser, 
clientId)
+                : quotaCallback.quotaMetricTags(clientQuotaType, 
session.principal, clientId);
+        var sensors = new ClientSensors(
+                metricTags,
+                sensorAccessor.getOrCreate(
+                        getQuotaSensorName(metricTags),
+                        INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS,
+                        sensor -> registerQuotaMetrics(metricTags, sensor)  // 
quotaLimit() called here only for new sensors
+                ),
+                sensorAccessor.getOrCreate(
+                        getThrottleTimeSensorName(metricTags),
+                        INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS,
+                        sensor -> sensor.add(throttleMetricName(metricTags), 
new Avg())
+                )
+        );
+
+        if (quotaCallback.quotaResetRequired(clientQuotaType)) {
+            updateQuotaMetricConfigs();
+        }
+
+        return sensors;
+    }
+
+    protected void registerQuotaMetrics(Map<String, String> metricTags, Sensor 
sensor) {
+        sensor.add(
+                clientQuotaMetricName(metricTags),
+                new Rate(),
+                getQuotaMetricConfig(metricTags)
+        );
+    }
+
+    private String metricTagsToSensorSuffix(Map<String, String> metricTags) {
+        return String.join(":", metricTags.values());
+    }
+
+    private String getThrottleTimeSensorName(Map<String, String> metricTags) {
+        return quotaType.toString() + "ThrottleTime-" + 
metricTagsToSensorSuffix(metricTags);
+    }
+
+    private String getQuotaSensorName(Map<String, String> metricTags) {
+        return quotaType.toString() + "-" + 
metricTagsToSensorSuffix(metricTags);
+    }
+
+    protected MetricConfig getQuotaMetricConfig(Map<String, String> 
metricTags) {
+        return getQuotaMetricConfig(quotaLimit(metricTags));
+    }
+
+    private MetricConfig getQuotaMetricConfig(double quotaLimit) {
+        return new MetricConfig()
+                .timeWindow(config.quotaWindowSizeSeconds, TimeUnit.SECONDS)
+                .samples(config.numQuotaSamples)
+                .quota(new Quota(quotaLimit, true));
+    }
+
+    protected Sensor getOrCreateSensor(String sensorName, long 
expirationTimeSeconds, Consumer<Sensor> registerMetrics) {
+        return sensorAccessor.getOrCreate(
+                sensorName,
+                expirationTimeSeconds,
+                registerMetrics);
+    }
+
+    /**
+     * Overrides quotas for <user>, <client-id> or <user, client-id> or the 
dynamic defaults
+     * for any of these levels.
+     *
+     * @param userEntity   user to override if quota applies to <user> or 
<user, client-id>
+     * @param clientEntity sanitized client entity to override if quota 
applies to <client-id> or <user, client-id>
+     * @param quota        custom quota to apply or None if quota override is 
being removed
+     */
+    public void updateQuota(
+            Optional<ClientQuotaEntity.ConfigEntity> userEntity,
+            Optional<ClientQuotaEntity.ConfigEntity> clientEntity,
+            Optional<Quota> quota
+    ) {
+        /*
+         * Acquire the write lock to apply changes in the quota objects.
+         * This method changes the quota in the overriddenQuota map and 
applies the update on the actual KafkaMetric object (if it exists).
+         * If the KafkaMetric hasn't been created, the most recent value will 
be used from the overriddenQuota map.
+         * The write lock prevents quota update and creation at the same time. 
It also guards against concurrent quota change
+         * notifications
+         */
+        lock.writeLock().lock();
+        try {
+            var quotaEntity = new KafkaQuotaEntity(userEntity.orElse(null), 
clientEntity.orElse(null));
+
+            // Apply quota changes with proper quota type tracking
+            if (quota.isPresent()) {
+                updateQuotaTypes(quotaEntity, true);
+                quotaCallback.updateQuota(clientQuotaType, quotaEntity, 
quota.get().bound());
+            } else {
+                updateQuotaTypes(quotaEntity, false);
+                quotaCallback.removeQuota(clientQuotaType, quotaEntity);
+            }
+
+            // Determine which entities need metric config updates
+            Optional<KafkaQuotaEntity> updatedEntity;
+            if (userEntity.filter(entity -> entity == 
DEFAULT_USER_ENTITY).isPresent() ||
+                    clientEntity.filter(entity -> entity == 
DEFAULT_USER_CLIENT_ID).isPresent()) {
+                // More than one entity may need updating, so 
updateQuotaMetricConfigs will go through all metrics
+                updatedEntity = Optional.empty();
+            } else {
+                updatedEntity = Optional.of(quotaEntity);
+            }
+
+            updateQuotaMetricConfigs(updatedEntity);

Review Comment:
   Since `updateQuotaMetricConfigs` iterates through all metrics, I'm unsure 
why it must be executed under a write lock. Perhaps we could create a follow-up 
ticket for this. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to