This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7fb6e9ec1c0 KAFKA-17840 Move ReplicationQuotaManager, 
ClientRequestQuotaManager and QuotaFactory to server module (#17609)
7fb6e9ec1c0 is described below

commit 7fb6e9ec1c059289740f72c0e27babad24d9beec
Author: PoAn Yang <[email protected]>
AuthorDate: Wed Oct 30 21:18:28 2024 +0800

    KAFKA-17840 Move ReplicationQuotaManager, ClientRequestQuotaManager and 
QuotaFactory to server module (#17609)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/server/ClientRequestQuotaManager.java    | 107 +++++++++++++
 core/src/main/java/kafka/server/QuotaFactory.java  | 159 ++++++++++++++++++
 core/src/main/java/kafka/server/ReplicaQuota.java  |  25 +++
 .../java/kafka/server/ReplicationQuotaManager.java | 154 ++++++++++++++++++
 .../java/kafka/server/share/DelayedShareFetch.java |   2 +-
 .../main/scala/kafka/network/RequestChannel.scala  |   8 +-
 .../kafka/server/ClientRequestQuotaManager.scala   |  94 -----------
 .../main/scala/kafka/server/ConfigHandler.scala    |   5 +-
 .../scala/kafka/server/DynamicBrokerConfig.scala   |   9 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |   6 +-
 .../scala/kafka/server/LocalLeaderEndPoint.scala   |   4 +-
 .../src/main/scala/kafka/server/QuotaFactory.scala |  97 -----------
 .../kafka/server/ReplicationQuotaManager.scala     | 177 ---------------------
 .../kafka/server/LocalLeaderEndPointTest.scala     |   2 +-
 .../kafka/server/RemoteLeaderEndPointTest.scala    |   2 +-
 .../server/ClientRequestQuotaManagerTest.scala     |   6 +-
 .../unit/kafka/server/ControllerApisTest.scala     |  10 +-
 .../kafka/server/DynamicBrokerConfigTest.scala     |   8 +-
 .../kafka/server/DynamicConfigChangeTest.scala     |   5 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |   4 +-
 .../server/ReplicaAlterLogDirsThreadTest.scala     |   4 +-
 .../kafka/server/ReplicaFetcherThreadTest.scala    |   4 +-
 .../server/ReplicaManagerConcurrencyTest.scala     |   2 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     |  14 +-
 .../kafka/server/ReplicationQuotaManagerTest.scala |   2 +-
 .../metadata/KRaftMetadataRequestBenchmark.java    |   2 +-
 26 files changed, 496 insertions(+), 416 deletions(-)

diff --git a/core/src/main/java/kafka/server/ClientRequestQuotaManager.java 
b/core/src/main/java/kafka/server/ClientRequestQuotaManager.java
new file mode 100644
index 00000000000..f93c50b2783
--- /dev/null
+++ b/core/src/main/java/kafka/server/ClientRequestQuotaManager.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server;
+
+import kafka.network.RequestChannel;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.config.ClientQuotaManagerConfig;
+import org.apache.kafka.server.quota.ClientQuotaCallback;
+import org.apache.kafka.server.quota.QuotaType;
+import org.apache.kafka.server.quota.QuotaUtils;
+
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import scala.jdk.javaapi.CollectionConverters;
+import scala.jdk.javaapi.OptionConverters;
+
+@SuppressWarnings("this-escape")
+public class ClientRequestQuotaManager extends ClientQuotaManager {
+    // Since exemptSensor is for all clients and has a constant name, we do 
not expire exemptSensor and only
+    // create once.
+    static final double NANOS_TO_PERCENTAGE_PER_SECOND = 100.0 / 
TimeUnit.SECONDS.toNanos(1);
+    private static final long 
DEFAULT_INACTIVE_EXEMPT_SENSOR_EXPIRATION_TIME_SECONDS = Long.MAX_VALUE;
+    private static final String EXEMPT_SENSOR_NAME = "exempt-" + 
QuotaType.REQUEST;
+
+    private final long maxThrottleTimeMs;
+    private final Metrics metrics;
+    private final MetricName exemptMetricName;
+    // Visible for testing
+    private final Sensor exemptSensor;
+
+    public ClientRequestQuotaManager(ClientQuotaManagerConfig config, Metrics 
metrics, Time time, String threadNamePrefix, Optional<ClientQuotaCallback> 
quotaCallback) {
+        super(config, metrics, QuotaType.REQUEST, time, threadNamePrefix, 
OptionConverters.toScala(quotaCallback));
+        this.maxThrottleTimeMs = 
TimeUnit.SECONDS.toMillis(config.quotaWindowSizeSeconds);
+        this.metrics = metrics;
+        this.exemptMetricName = metrics.metricName("exempt-request-time", 
QuotaType.REQUEST.toString(), "Tracking exempt-request-time utilization 
percentage");
+        exemptSensor = getOrCreateSensor(EXEMPT_SENSOR_NAME, 
DEFAULT_INACTIVE_EXEMPT_SENSOR_EXPIRATION_TIME_SECONDS, sensor -> 
sensor.add(exemptMetricName, new Rate()));
+    }
+
+    public Sensor exemptSensor() {
+        return exemptSensor;
+    }
+
+    private void recordExempt(double value) {
+        exemptSensor.record(value);
+    }
+
+    /**
+     * Records that a user/clientId changed request processing time being 
throttled. If quota has been violated, return
+     * throttle time in milliseconds. Throttle time calculation may be 
overridden by sub-classes.
+     * @param request client request
+     * @return Number of milliseconds to throttle in case of quota violation. 
Zero otherwise
+     */
+    public int maybeRecordAndGetThrottleTimeMs(RequestChannel.Request request, 
long timeMs) {
+        if (quotasEnabled()) {
+            request.setRecordNetworkThreadTimeCallback(timeNanos -> {
+                recordNoThrottle(request.session(), 
request.header().clientId(), 
nanosToPercentage(Long.parseLong(timeNanos.toString())));
+            });
+            return recordAndGetThrottleTimeMs(request.session(), 
request.header().clientId(), 
nanosToPercentage(request.requestThreadTimeNanos()), timeMs);
+        } else {
+            return 0;
+        }
+    }
+
+    public void maybeRecordExempt(RequestChannel.Request request) {
+        if (quotasEnabled()) {
+            request.setRecordNetworkThreadTimeCallback(timeNanos -> {
+                
recordExempt(nanosToPercentage(Long.parseLong(timeNanos.toString())));
+            });
+            recordExempt(nanosToPercentage(request.requestThreadTimeNanos()));
+        }
+    }
+
+    @Override
+    public long throttleTime(QuotaViolationException e, long timeMs) {
+        return QuotaUtils.boundedThrottleTime(e, maxThrottleTimeMs, timeMs);
+    }
+
+    @Override
+    public MetricName 
clientQuotaMetricName(scala.collection.immutable.Map<String, String> 
quotaMetricTags) {
+        return metrics.metricName("request-time", 
QuotaType.REQUEST.toString(), "Tracking request-time per user/client-id", 
CollectionConverters.asJava(quotaMetricTags));
+    }
+
+    private double nanosToPercentage(long nanos) {
+        return nanos * NANOS_TO_PERCENTAGE_PER_SECOND;
+    }
+}
diff --git a/core/src/main/java/kafka/server/QuotaFactory.java 
b/core/src/main/java/kafka/server/QuotaFactory.java
new file mode 100644
index 00000000000..57e9139e285
--- /dev/null
+++ b/core/src/main/java/kafka/server/QuotaFactory.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.config.ClientQuotaManagerConfig;
+import org.apache.kafka.server.config.QuotaConfig;
+import org.apache.kafka.server.config.ReplicationQuotaManagerConfig;
+import org.apache.kafka.server.quota.ClientQuotaCallback;
+import org.apache.kafka.server.quota.QuotaType;
+
+import java.util.Optional;
+
+import scala.Option;
+
+public class QuotaFactory {
+
+    public static final ReplicaQuota UNBOUNDED_QUOTA = new ReplicaQuota() {
+        @Override
+        public boolean isThrottled(TopicPartition topicPartition) {
+            return false;
+        }
+
+        @Override
+        public boolean isQuotaExceeded() {
+            return false;
+        }
+
+        @Override
+        public void record(long value) {
+            // No-op
+        }
+    };
+
+    public static class QuotaManagers {
+        private final ClientQuotaManager fetch;
+        private final ClientQuotaManager produce;
+        private final ClientRequestQuotaManager request;
+        private final ControllerMutationQuotaManager controllerMutation;
+        private final ReplicationQuotaManager leader;
+        private final ReplicationQuotaManager follower;
+        private final ReplicationQuotaManager alterLogDirs;
+        private final Optional<ClientQuotaCallback> clientQuotaCallback;
+
+        public QuotaManagers(ClientQuotaManager fetch, ClientQuotaManager 
produce, ClientRequestQuotaManager request,
+                             ControllerMutationQuotaManager 
controllerMutation, ReplicationQuotaManager leader,
+                             ReplicationQuotaManager follower, 
ReplicationQuotaManager alterLogDirs,
+                             Optional<ClientQuotaCallback> 
clientQuotaCallback) {
+            this.fetch = fetch;
+            this.produce = produce;
+            this.request = request;
+            this.controllerMutation = controllerMutation;
+            this.leader = leader;
+            this.follower = follower;
+            this.alterLogDirs = alterLogDirs;
+            this.clientQuotaCallback = clientQuotaCallback;
+        }
+
+        public ClientQuotaManager fetch() {
+            return fetch;
+        }
+
+        public ClientQuotaManager produce() {
+            return produce;
+        }
+
+        public ClientRequestQuotaManager request() {
+            return request;
+        }
+
+        public ControllerMutationQuotaManager controllerMutation() {
+            return controllerMutation;
+        }
+
+        public ReplicationQuotaManager leader() {
+            return leader;
+        }
+
+        public ReplicationQuotaManager follower() {
+            return follower;
+        }
+
+        public ReplicationQuotaManager alterLogDirs() {
+            return alterLogDirs;
+        }
+
+        public Optional<ClientQuotaCallback> clientQuotaCallback() {
+            return clientQuotaCallback;
+        }
+
+        public void shutdown() {
+            fetch.shutdown();
+            produce.shutdown();
+            request.shutdown();
+            controllerMutation.shutdown();
+            clientQuotaCallback.ifPresent(ClientQuotaCallback::close);
+        }
+    }
+
+    public static QuotaManagers instantiate(KafkaConfig cfg, Metrics metrics, 
Time time, String threadNamePrefix) {
+        ClientQuotaCallback clientQuotaCallback = cfg.getConfiguredInstance(
+            QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, 
ClientQuotaCallback.class);
+
+        return new QuotaManagers(
+            new ClientQuotaManager(clientConfig(cfg), metrics, 
QuotaType.FETCH, time, threadNamePrefix, Option.apply(clientQuotaCallback)),
+            new ClientQuotaManager(clientConfig(cfg), metrics, 
QuotaType.PRODUCE, time, threadNamePrefix, Option.apply(clientQuotaCallback)),
+            new ClientRequestQuotaManager(clientConfig(cfg), metrics, time, 
threadNamePrefix, Optional.ofNullable(clientQuotaCallback)),
+            new 
ControllerMutationQuotaManager(clientControllerMutationConfig(cfg), metrics, 
time, threadNamePrefix, Option.apply(clientQuotaCallback)),
+            new ReplicationQuotaManager(replicationConfig(cfg), metrics, 
QuotaType.LEADER_REPLICATION, time),
+            new ReplicationQuotaManager(replicationConfig(cfg), metrics, 
QuotaType.FOLLOWER_REPLICATION, time),
+            new ReplicationQuotaManager(alterLogDirsReplicationConfig(cfg), 
metrics, QuotaType.ALTER_LOG_DIRS_REPLICATION, time),
+            Optional.ofNullable(clientQuotaCallback)
+        );
+    }
+
+    private static ClientQuotaManagerConfig clientConfig(KafkaConfig cfg) {
+        return new ClientQuotaManagerConfig(
+            cfg.quotaConfig().numQuotaSamples(),
+            cfg.quotaConfig().quotaWindowSizeSeconds()
+        );
+    }
+
+    private static ClientQuotaManagerConfig 
clientControllerMutationConfig(KafkaConfig cfg) {
+        return new ClientQuotaManagerConfig(
+            cfg.quotaConfig().numControllerQuotaSamples(),
+            cfg.quotaConfig().controllerQuotaWindowSizeSeconds()
+        );
+    }
+
+    private static ReplicationQuotaManagerConfig replicationConfig(KafkaConfig 
cfg) {
+        return new ReplicationQuotaManagerConfig(
+            cfg.quotaConfig().numReplicationQuotaSamples(),
+            cfg.quotaConfig().replicationQuotaWindowSizeSeconds()
+        );
+    }
+
+    private static ReplicationQuotaManagerConfig 
alterLogDirsReplicationConfig(KafkaConfig cfg) {
+        return new ReplicationQuotaManagerConfig(
+            cfg.quotaConfig().numAlterLogDirsReplicationQuotaSamples(),
+            cfg.quotaConfig().alterLogDirsReplicationQuotaWindowSizeSeconds()
+        );
+    }
+}
\ No newline at end of file
diff --git a/core/src/main/java/kafka/server/ReplicaQuota.java 
b/core/src/main/java/kafka/server/ReplicaQuota.java
new file mode 100644
index 00000000000..7d65ba25001
--- /dev/null
+++ b/core/src/main/java/kafka/server/ReplicaQuota.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server;
+
+import org.apache.kafka.common.TopicPartition;
+
+public interface ReplicaQuota {
+    void record(long value);
+    boolean isThrottled(TopicPartition topicPartition);
+    boolean isQuotaExceeded();
+}
\ No newline at end of file
diff --git a/core/src/main/java/kafka/server/ReplicationQuotaManager.java 
b/core/src/main/java/kafka/server/ReplicationQuotaManager.java
new file mode 100644
index 00000000000..9301e59dfe6
--- /dev/null
+++ b/core/src/main/java/kafka/server/ReplicationQuotaManager.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Quota;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.config.ReplicationQuotaManagerConfig;
+import org.apache.kafka.server.quota.QuotaType;
+import org.apache.kafka.server.quota.SensorAccess;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class ReplicationQuotaManager implements ReplicaQuota {
+    public static final List<Integer> ALL_REPLICAS = List.of(-1);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ReplicationQuotaManager.class);
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    private final ConcurrentHashMap<String, List<Integer>> throttledPartitions 
= new ConcurrentHashMap<>();
+    private final ReplicationQuotaManagerConfig config;
+    private final Metrics metrics;
+    private final QuotaType replicationType;
+    private final Time time;
+    private final SensorAccess sensorAccess;
+    private final MetricName rateMetricName;
+    private Quota quota;
+
+    public ReplicationQuotaManager(ReplicationQuotaManagerConfig config, 
Metrics metrics, QuotaType replicationType, Time time) {
+        this.config = config;
+        this.metrics = metrics;
+        this.replicationType = replicationType;
+        this.time = time;
+        this.sensorAccess = new SensorAccess(lock, metrics);
+        this.rateMetricName = metrics.metricName("byte-rate", 
replicationType.toString(), "Tracking byte-rate for " + replicationType);
+    }
+
+    /**
+     * Update the quota
+     */
+    public void updateQuota(Quota quota) {
+        lock.writeLock().lock();
+        try {
+            this.quota = quota;
+            KafkaMetric metric = metrics.metrics().get(rateMetricName);
+            if (metric != null) {
+                metric.config(getQuotaMetricConfig(quota));
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Check if the quota is currently exceeded
+     */
+    @Override
+    public boolean isQuotaExceeded() {
+        try {
+            sensor().checkQuotas();
+            return false;
+        } catch (QuotaViolationException qve) {
+            LOGGER.trace("{}: Quota violated for sensor ({}), metric: ({}), 
metric-value: ({}), bound: ({})",
+                replicationType, sensor().name(), qve.metric().metricName(), 
qve.value(), qve.bound());
+            return true;
+        }
+    }
+
+    /**
+     * Is the passed partition throttled by this ReplicationQuotaManager
+     */
+    @Override
+    public boolean isThrottled(TopicPartition topicPartition) {
+        List<Integer> partitions = 
throttledPartitions.get(topicPartition.topic());
+        return partitions != null && (partitions.equals(ALL_REPLICAS) || 
partitions.contains(topicPartition.partition()));
+    }
+
+    /**
+     * Add the passed value to the throttled rate. This method ignores the 
quota with
+     * the value being added to the rate even if the quota is exceeded
+     */
+    @Override
+    public void record(long value) {
+        sensor().record((double) value, time.milliseconds(), false);
+    }
+
+    /**
+     * Update the set of throttled partitions for this QuotaManager. The 
partitions passed, for
+     * any single topic, will replace any previous
+     */
+    public void markThrottled(String topic, List<Integer> partitions) {
+        throttledPartitions.put(topic, partitions);
+    }
+
+    /**
+     * Mark all replicas for this topic as throttled
+     */
+    public void markThrottled(String topic) {
+        markThrottled(topic, ALL_REPLICAS);
+    }
+
+    public void removeThrottle(String topic) {
+        throttledPartitions.remove(topic);
+    }
+
+    public long upperBound() {
+        lock.readLock().lock();
+        try {
+            return quota != null ? (long) quota.bound() : Long.MAX_VALUE;
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    private MetricConfig getQuotaMetricConfig(Quota quota) {
+        return new MetricConfig()
+            .timeWindow(config.quotaWindowSizeSeconds, TimeUnit.SECONDS)
+            .samples(config.numQuotaSamples)
+            .quota(quota);
+    }
+
+    private Sensor sensor() {
+        return sensorAccess.getOrCreate(
+            replicationType.toString(),
+            
ReplicationQuotaManagerConfig.INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS,
+            sensor -> sensor.add(rateMetricName, new SimpleRate(), 
getQuotaMetricConfig(quota))
+        );
+    }
+}
diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java 
b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
index 732381a1976..f04b5186b8b 100644
--- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
+++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
@@ -109,7 +109,7 @@ public class DelayedShareFetch extends DelayedOperation {
                     topicPartitionData.entrySet().stream().map(entry ->
                         new Tuple2<>(entry.getKey(), 
entry.getValue())).collect(Collectors.toList())
                 ),
-                QuotaFactory.UnboundedQuota$.MODULE$,
+                QuotaFactory.UNBOUNDED_QUOTA,
                 true);
 
             Map<TopicIdPartition, FetchPartitionData> responseData = new 
HashMap<>();
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala 
b/core/src/main/scala/kafka/network/RequestChannel.scala
index c48811439e2..44f5e926eb9 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -73,7 +73,7 @@ object RequestChannel extends Logging {
     @volatile var messageConversionsTimeNanos: Long = 0L
     @volatile var apiThrottleTimeMs: Long = 0L
     @volatile var temporaryMemoryBytes: Long = 0L
-    @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
+    @volatile var recordNetworkThreadTimeCallback: 
Option[java.util.function.Consumer[java.lang.Long]] = None
     @volatile var callbackRequestDequeueTimeNanos: Option[Long] = None
     @volatile var callbackRequestCompleteTimeNanos: Option[Long] = None
 
@@ -247,7 +247,7 @@ object RequestChannel extends Logging {
       // The time recorded here is the time spent on the network thread for 
receiving this request
       // and sending the response. Note that for the first request on a 
connection, the time includes
       // the total time spent on authentication, which may be significant for 
SASL/SSL.
-      recordNetworkThreadTimeCallback.foreach(record => 
record(networkThreadTimeNanos))
+      recordNetworkThreadTimeCallback.foreach(record => 
record.accept(networkThreadTimeNanos))
 
       if (isRequestLoggingEnabled) {
         val desc = RequestConvertToJson.requestDescMetrics(header, 
requestLog.toJava, response.responseLog.toJava,
@@ -272,6 +272,10 @@ object RequestChannel extends Logging {
       }
     }
 
+    def setRecordNetworkThreadTimeCallback(callback: 
java.util.function.Consumer[java.lang.Long]): Unit = {
+      recordNetworkThreadTimeCallback = Some(callback)
+    }
+
     override def toString: String = s"Request(processor=$processor, " +
       s"connectionId=${context.connectionId}, " +
       s"session=$session, " +
diff --git a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala 
b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
deleted file mode 100644
index 248a7cb5125..00000000000
--- a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server
-
-import java.util.concurrent.TimeUnit
-import kafka.network.RequestChannel
-import org.apache.kafka.common.MetricName
-import org.apache.kafka.common.metrics._
-import org.apache.kafka.common.metrics.stats.Rate
-import org.apache.kafka.common.utils.Time
-import org.apache.kafka.server.config.ClientQuotaManagerConfig
-import org.apache.kafka.server.quota.{ClientQuotaCallback, QuotaType, 
QuotaUtils}
-
-import scala.jdk.CollectionConverters._
-
-object ClientRequestQuotaManager {
-  val NanosToPercentagePerSecond: Double = 100.0 / TimeUnit.SECONDS.toNanos(1)
-  // Since exemptSensor is for all clients and has a constant name, we do not 
expire exemptSensor and only
-  // create once.
-  private val DefaultInactiveExemptSensorExpirationTimeSeconds: Long = 
Long.MaxValue
-
-  private val ExemptSensorName = "exempt-" + QuotaType.REQUEST
-}
-
-class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig,
-                                private val metrics: Metrics,
-                                private val time: Time,
-                                private val threadNamePrefix: String,
-                                private val quotaCallback: 
Option[ClientQuotaCallback])
-    extends ClientQuotaManager(config, metrics, QuotaType.REQUEST, time, 
threadNamePrefix, quotaCallback) {
-
-  private val maxThrottleTimeMs = 
TimeUnit.SECONDS.toMillis(this.config.quotaWindowSizeSeconds)
-  private val exemptMetricName = metrics.metricName("exempt-request-time",
-    QuotaType.REQUEST.toString, "Tracking exempt-request-time utilization 
percentage")
-
-  val exemptSensor: Sensor = 
getOrCreateSensor(ClientRequestQuotaManager.ExemptSensorName,
-    ClientRequestQuotaManager.DefaultInactiveExemptSensorExpirationTimeSeconds,
-    sensor => sensor.add(exemptMetricName, new Rate))
-
-  private def recordExempt(value: Double): Unit = {
-    exemptSensor.record(value)
-  }
-
-  /**
-    * Records that a user/clientId changed request processing time being 
throttled. If quota has been violated, return
-    * throttle time in milliseconds. Throttle time calculation may be 
overridden by sub-classes.
-    * @param request client request
-    * @return Number of milliseconds to throttle in case of quota violation. 
Zero otherwise
-    */
-  def maybeRecordAndGetThrottleTimeMs(request: RequestChannel.Request, timeMs: 
Long): Int = {
-    if (quotasEnabled) {
-      request.recordNetworkThreadTimeCallback = Some(timeNanos => 
recordNoThrottle(
-        request.session, request.header.clientId, 
nanosToPercentage(timeNanos)))
-      recordAndGetThrottleTimeMs(request.session, request.header.clientId,
-        nanosToPercentage(request.requestThreadTimeNanos), timeMs)
-    } else {
-      0
-    }
-  }
-
-  def maybeRecordExempt(request: RequestChannel.Request): Unit = {
-    if (quotasEnabled) {
-      request.recordNetworkThreadTimeCallback = Some(timeNanos => 
recordExempt(nanosToPercentage(timeNanos)))
-      recordExempt(nanosToPercentage(request.requestThreadTimeNanos))
-    }
-  }
-
-  override protected def throttleTime(e: QuotaViolationException, timeMs: 
Long): Long = {
-    QuotaUtils.boundedThrottleTime(e, maxThrottleTimeMs, timeMs)
-  }
-
-  override protected def clientQuotaMetricName(quotaMetricTags: Map[String, 
String]): MetricName = {
-    metrics.metricName("request-time", QuotaType.REQUEST.toString,
-      "Tracking request-time per user/client-id",
-      quotaMetricTags.asJava)
-  }
-
-  private def nanosToPercentage(nanos: Long): Double =
-    nanos * ClientRequestQuotaManager.NanosToPercentagePerSecond
-}
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala 
b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 3f665c4eb91..35c521bfa63 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -22,7 +22,6 @@ import java.util.{Collections, Properties}
 import kafka.controller.KafkaController
 import kafka.log.UnifiedLog
 import kafka.network.ConnectionQuotas
-import kafka.server.Constants._
 import kafka.server.QuotaFactory.QuotaManagers
 import kafka.utils.Logging
 import org.apache.kafka.server.config.{QuotaConfig, ReplicationConfigs, 
ZooKeeperInternals}
@@ -132,7 +131,7 @@ class TopicConfigHandler(private val replicaManager: 
ReplicaManager,
     def updateThrottledList(prop: String, quotaManager: 
ReplicationQuotaManager): Unit = {
       if (topicConfig.containsKey(prop) && 
topicConfig.getProperty(prop).nonEmpty) {
         val partitions = parseThrottledPartitions(topicConfig, 
kafkaConfig.brokerId, prop)
-        quotaManager.markThrottled(topic, partitions)
+        quotaManager.markThrottled(topic, 
partitions.map(Integer.valueOf).asJava)
         debug(s"Setting $prop on broker ${kafkaConfig.brokerId} for topic: 
$topic and partitions $partitions")
       } else {
         quotaManager.removeThrottle(topic)
@@ -152,7 +151,7 @@ class TopicConfigHandler(private val replicaManager: 
ReplicaManager,
     ThrottledReplicaListValidator.ensureValidString(prop, configValue)
     configValue match {
       case "" => Seq()
-      case "*" => AllReplicas
+      case "*" => 
ReplicationQuotaManager.ALL_REPLICAS.asScala.map(_.toInt).toSeq
       case _ => configValue.trim
         .split(",")
         .map(_.split(":"))
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 79585349268..9f69a44c919 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -1016,7 +1016,7 @@ class DynamicClientQuotaCallback(
 
   override def reconfigurableConfigs(): util.Set[String] = {
     val configs = new util.HashSet[String]()
-    quotaManagers.clientQuotaCallback.foreach {
+    quotaManagers.clientQuotaCallback.ifPresent {
       case callback: Reconfigurable => 
configs.addAll(callback.reconfigurableConfigs)
       case _ =>
     }
@@ -1024,18 +1024,17 @@ class DynamicClientQuotaCallback(
   }
 
   override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
-    quotaManagers.clientQuotaCallback.foreach {
+    quotaManagers.clientQuotaCallback.ifPresent {
       case callback: Reconfigurable => 
callback.validateReconfiguration(configs)
       case _ =>
     }
   }
 
   override def reconfigure(configs: util.Map[String, _]): Unit = {
-    quotaManagers.clientQuotaCallback.foreach {
+    quotaManagers.clientQuotaCallback.ifPresent {
       case callback: Reconfigurable =>
         serverConfig.dynamicConfig.maybeReconfigure(callback, 
serverConfig.dynamicConfig.currentKafkaConfig, configs)
-        true
-      case _ => false
+      case _ =>
     }
   }
 }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index c2f24381258..b4bb9a69198 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -20,7 +20,7 @@ package kafka.server
 import kafka.controller.ReplicaAssignment
 import kafka.coordinator.transaction.{InitProducerIdResult, 
TransactionCoordinator}
 import kafka.network.RequestChannel
-import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
+import kafka.server.QuotaFactory.{QuotaManagers, UNBOUNDED_QUOTA}
 import kafka.server.handlers.DescribeTopicPartitionsRequestHandler
 import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache}
 import kafka.server.share.SharePartitionManager
@@ -406,7 +406,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
       }
 
-      quotas.clientQuotaCallback.foreach { callback =>
+      quotas.clientQuotaCallback.ifPresent { callback =>
         if 
(callback.updateClusterMetadata(metadataCache.getClusterMetadata(clusterId, 
request.context.listenerName))) {
           quotas.fetch.updateQuotaMetricConfigs()
           quotas.produce.updateQuotaMetricConfigs()
@@ -1073,7 +1073,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def replicationQuota(fetchRequest: FetchRequest): ReplicaQuota =
-    if (fetchRequest.isFromFollower) quotas.leader else UnboundedQuota
+    if (fetchRequest.isFromFollower) quotas.leader else UNBOUNDED_QUOTA
 
   def handleListOffsetRequest(request: RequestChannel.Request): Unit = {
     val version = request.header.apiVersion
diff --git a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala 
b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
index 73b661f0bb4..03258295a41 100644
--- a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
+++ b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
@@ -18,7 +18,7 @@
 package kafka.server
 
 import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions}
-import kafka.server.QuotaFactory.UnboundedQuota
+import kafka.server.QuotaFactory.UNBOUNDED_QUOTA
 import kafka.utils.Logging
 import org.apache.kafka.common.errors.KafkaStorageException
 import org.apache.kafka.common.message.FetchResponseData
@@ -105,7 +105,7 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint,
     replicaManager.fetchMessages(
       params = fetchParams,
       fetchInfos = fetchData.asScala.toSeq,
-      quota = UnboundedQuota,
+      quota = UNBOUNDED_QUOTA,
       responseCallback = processResponseCallback
     )
 
diff --git a/core/src/main/scala/kafka/server/QuotaFactory.scala 
b/core/src/main/scala/kafka/server/QuotaFactory.scala
deleted file mode 100644
index 6d295a96369..00000000000
--- a/core/src/main/scala/kafka/server/QuotaFactory.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-package kafka.server
-
-import kafka.utils.Logging
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.server.quota.{ClientQuotaCallback, QuotaType}
-import org.apache.kafka.common.utils.Time
-import org.apache.kafka.server.config.{ClientQuotaManagerConfig, QuotaConfig, 
ReplicationQuotaManagerConfig}
-
-
-object QuotaFactory extends Logging {
-
-  object UnboundedQuota extends ReplicaQuota {
-    override def isThrottled(topicPartition: TopicPartition): Boolean = false
-    override def isQuotaExceeded: Boolean = false
-    def record(value: Long): Unit = ()
-  }
-
-  case class QuotaManagers(fetch: ClientQuotaManager,
-                           produce: ClientQuotaManager,
-                           request: ClientRequestQuotaManager,
-                           controllerMutation: ControllerMutationQuotaManager,
-                           leader: ReplicationQuotaManager,
-                           follower: ReplicationQuotaManager,
-                           alterLogDirs: ReplicationQuotaManager,
-                           clientQuotaCallback: Option[ClientQuotaCallback]) {
-    def shutdown(): Unit = {
-      fetch.shutdown()
-      produce.shutdown()
-      request.shutdown()
-      controllerMutation.shutdown()
-      clientQuotaCallback.foreach(_.close())
-    }
-  }
-
-  def instantiate(cfg: KafkaConfig, metrics: Metrics, time: Time, 
threadNamePrefix: String): QuotaManagers = {
-
-    val clientQuotaCallback = 
Option(cfg.getConfiguredInstance(QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG,
-      classOf[ClientQuotaCallback]))
-    QuotaManagers(
-      new ClientQuotaManager(clientConfig(cfg), metrics, QuotaType.FETCH, 
time, threadNamePrefix, clientQuotaCallback),
-      new ClientQuotaManager(clientConfig(cfg), metrics, QuotaType.PRODUCE, 
time, threadNamePrefix, clientQuotaCallback),
-      new ClientRequestQuotaManager(clientConfig(cfg), metrics, time, 
threadNamePrefix, clientQuotaCallback),
-      new ControllerMutationQuotaManager(clientControllerMutationConfig(cfg), 
metrics, time,
-        threadNamePrefix, clientQuotaCallback),
-      new ReplicationQuotaManager(replicationConfig(cfg), metrics, 
QuotaType.LEADER_REPLICATION, time),
-      new ReplicationQuotaManager(replicationConfig(cfg), metrics, 
QuotaType.FOLLOWER_REPLICATION, time),
-      new ReplicationQuotaManager(alterLogDirsReplicationConfig(cfg), metrics, 
QuotaType.ALTER_LOG_DIRS_REPLICATION, time),
-      clientQuotaCallback
-    )
-  }
-
-  def clientConfig(cfg: KafkaConfig): ClientQuotaManagerConfig = {
-    new ClientQuotaManagerConfig(
-      cfg.quotaConfig.numQuotaSamples,
-      cfg.quotaConfig.quotaWindowSizeSeconds
-    )
-  }
-
-  private def clientControllerMutationConfig(cfg: KafkaConfig): 
ClientQuotaManagerConfig = {
-    new ClientQuotaManagerConfig(
-      cfg.quotaConfig.numControllerQuotaSamples,
-      cfg.quotaConfig.controllerQuotaWindowSizeSeconds
-    )
-  }
-
-  private def replicationConfig(cfg: KafkaConfig): 
ReplicationQuotaManagerConfig = {
-    new ReplicationQuotaManagerConfig(
-      cfg.quotaConfig.numReplicationQuotaSamples,
-      cfg.quotaConfig.replicationQuotaWindowSizeSeconds
-    )
-  }
-
-  private def alterLogDirsReplicationConfig(cfg: KafkaConfig): 
ReplicationQuotaManagerConfig = {
-    new ReplicationQuotaManagerConfig(
-      cfg.quotaConfig.numAlterLogDirsReplicationQuotaSamples,
-      cfg.quotaConfig.alterLogDirsReplicationQuotaWindowSizeSeconds
-    )
-  }
-
-}
diff --git a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala 
b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
deleted file mode 100644
index 6ae4b5aa284..00000000000
--- a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-package kafka.server
-
-import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
-import java.util.concurrent.locks.ReentrantReadWriteLock
-import scala.collection.Seq
-import kafka.server.Constants._
-import kafka.utils.CoreUtils._
-import kafka.utils.Logging
-import org.apache.kafka.common.metrics._
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.metrics.stats.SimpleRate
-import org.apache.kafka.common.utils.Time
-import org.apache.kafka.server.config.ReplicationQuotaManagerConfig
-import org.apache.kafka.server.quota.{QuotaType, SensorAccess}
-
-trait ReplicaQuota {
-  def record(value: Long): Unit
-  def isThrottled(topicPartition: TopicPartition): Boolean
-  def isQuotaExceeded: Boolean
-}
-
-object Constants {
-  val AllReplicas = Seq[Int](-1)
-}
-
-/**
-  * Tracks replication metrics and comparing them to any quotas for throttled 
partitions.
-  *
-  * @param config          The quota configs
-  * @param metrics         The Metrics instance
-  * @param replicationType The name / key for this quota manager, typically 
leader or follower
-  * @param time            Time object to use
-  */
-class ReplicationQuotaManager(val config: ReplicationQuotaManagerConfig,
-                              private val metrics: Metrics,
-                              private val replicationType: QuotaType,
-                              private val time: Time) extends Logging with 
ReplicaQuota {
-  private val lock = new ReentrantReadWriteLock()
-  private val throttledPartitions = new ConcurrentHashMap[String, Seq[Int]]()
-  private var quota: Quota = _
-  private val sensorAccess = new SensorAccess(lock, metrics)
-  private val rateMetricName = metrics.metricName("byte-rate", 
replicationType.toString,
-    s"Tracking byte-rate for $replicationType")
-
-  /**
-    * Update the quota
-    *
-    * @param quota
-    */
-  def updateQuota(quota: Quota): Unit = {
-    inWriteLock(lock) {
-      this.quota = quota
-      //The metric could be expired by another thread, so use a local variable 
and null check.
-      val metric = metrics.metrics.get(rateMetricName)
-      if (metric != null) {
-        metric.config(getQuotaMetricConfig(quota))
-      }
-    }
-  }
-
-  /**
-    * Check if the quota is currently exceeded
-    *
-    * @return
-    */
-  override def isQuotaExceeded: Boolean = {
-    try {
-      sensor().checkQuotas()
-    } catch {
-      case qve: QuotaViolationException =>
-        trace(s"$replicationType: Quota violated for sensor 
(${sensor().name}), metric: (${qve.metric.metricName}), " +
-          s"metric-value: (${qve.value}), bound: (${qve.bound})")
-        return true
-    }
-    false
-  }
-
-  /**
-    * Is the passed partition throttled by this ReplicationQuotaManager
-    *
-    * @param topicPartition the partition to check
-    * @return
-    */
-  override def isThrottled(topicPartition: TopicPartition): Boolean = {
-    val partitions = throttledPartitions.get(topicPartition.topic)
-    if (partitions != null)
-      (partitions eq AllReplicas) || 
partitions.contains(topicPartition.partition)
-    else false
-  }
-
-  /**
-    * Add the passed value to the throttled rate. This method ignores the 
quota with
-    * the value being added to the rate even if the quota is exceeded
-    *
-    * @param value
-    */
-  def record(value: Long): Unit = {
-    sensor().record(value.toDouble, time.milliseconds(), false)
-  }
-
-  /**
-    * Update the set of throttled partitions for this QuotaManager. The 
partitions passed, for
-    * any single topic, will replace any previous
-    *
-    * @param topic
-    * @param partitions the set of throttled partitions
-    * @return
-    */
-  def markThrottled(topic: String, partitions: Seq[Int]): Unit = {
-    throttledPartitions.put(topic, partitions)
-  }
-
-  /**
-    * Mark all replicas for this topic as throttled
-    *
-    * @param topic
-    * @return
-    */
-  def markThrottled(topic: String): Unit = {
-    markThrottled(topic, AllReplicas)
-  }
-
-  /**
-    * Remove list of throttled replicas for a certain topic
-    *
-    * @param topic
-    * @return
-    */
-  def removeThrottle(topic: String): Unit = {
-    throttledPartitions.remove(topic)
-  }
-
-  /**
-    * Returns the bound of the configured quota
-    *
-    * @return
-    */
-  def upperBound: Long = {
-    inReadLock(lock) {
-      if (quota != null)
-        quota.bound.toLong
-      else
-        Long.MaxValue
-    }
-  }
-
-  private def getQuotaMetricConfig(quota: Quota): MetricConfig = {
-    new MetricConfig()
-      .timeWindow(config.quotaWindowSizeSeconds, TimeUnit.SECONDS)
-      .samples(config.numQuotaSamples)
-      .quota(quota)
-  }
-
-  private def sensor(): Sensor = {
-    sensorAccess.getOrCreate(
-      replicationType.toString,
-      ReplicationQuotaManagerConfig.INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS,
-      sensor => sensor.add(rateMetricName, new SimpleRate, 
getQuotaMetricConfig(quota))
-    )
-  }
-}
diff --git a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala 
b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
index cbb8ef3866d..6832b3a0cf7 100644
--- a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
+++ b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
@@ -80,7 +80,7 @@ class LocalLeaderEndPointTest extends Logging {
     replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
     replicaManager.getPartitionOrException(topicPartition)
       .localLogOrException
-    endPoint = new LocalLeaderEndPoint(sourceBroker, config, replicaManager, 
QuotaFactory.UnboundedQuota)
+    endPoint = new LocalLeaderEndPoint(sourceBroker, config, replicaManager, 
QuotaFactory.UNBOUNDED_QUOTA)
   }
 
   @AfterEach
diff --git a/core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala 
b/core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala
index eaffe282d36..514ca0a1b28 100644
--- a/core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala
+++ b/core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala
@@ -65,7 +65,7 @@ class RemoteLeaderEndPointTest {
         blockingSend = new MockBlockingSender(offsets = new 
util.HashMap[TopicPartition, EpochEndOffset](),
             sourceBroker = sourceBroker, time = time)
         endPoint = new RemoteLeaderEndPoint(logPrefix, blockingSend, 
fetchSessionHandler,
-            config, replicaManager, QuotaFactory.UnboundedQuota, () => 
MetadataVersion.MINIMUM_KRAFT_VERSION, () => currentBrokerEpoch)
+            config, replicaManager, QuotaFactory.UNBOUNDED_QUOTA, () => 
MetadataVersion.MINIMUM_KRAFT_VERSION, () => currentBrokerEpoch)
     }
 
     @Test
diff --git 
a/core/src/test/scala/unit/kafka/server/ClientRequestQuotaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ClientRequestQuotaManagerTest.scala
index ec09f3a7d6d..2b5471653d9 100644
--- a/core/src/test/scala/unit/kafka/server/ClientRequestQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientRequestQuotaManagerTest.scala
@@ -22,15 +22,17 @@ import org.apache.kafka.server.quota.QuotaType
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
 
+import java.util.Optional
+
 class ClientRequestQuotaManagerTest extends BaseClientQuotaManagerTest {
   private val config = new ClientQuotaManagerConfig()
 
   @Test
   def testRequestPercentageQuotaViolation(): Unit = {
-    val clientRequestQuotaManager = new ClientRequestQuotaManager(config, 
metrics, time, "", None)
+    val clientRequestQuotaManager = new ClientRequestQuotaManager(config, 
metrics, time, "", Optional.empty())
     clientRequestQuotaManager.updateQuota(Some("ANONYMOUS"), 
Some("test-client"), Some("test-client"), Some(Quota.upperBound(1)))
     val queueSizeMetric = 
metrics.metrics().get(metrics.metricName("queue-size", 
QuotaType.REQUEST.toString, ""))
-    def millisToPercent(millis: Double) = millis * 1000 * 1000 * 
ClientRequestQuotaManager.NanosToPercentagePerSecond
+    def millisToPercent(millis: Double) = millis * 1000 * 1000 * 
ClientRequestQuotaManager.NANOS_TO_PERCENTAGE_PER_SECOND
     try {
       // We have 10 second windows. Make sure that there is no quota violation
       // if we are under the quota
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala 
b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index c8ead4aef57..ec09e10840d 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -73,7 +73,7 @@ import java.util
 import java.util.Collections.{singleton, singletonList, singletonMap}
 import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit}
 import java.util.concurrent.atomic.AtomicReference
-import java.util.{Collections, Properties}
+import java.util.{Collections, Optional, Properties}
 import scala.jdk.CollectionConverters._
 import scala.reflect.ClassTag
 
@@ -127,7 +127,7 @@ class ControllerApisTest {
   private val raftManager: RaftManager[ApiMessageAndVersion] = 
mock(classOf[RaftManager[ApiMessageAndVersion]])
   private val metadataCache: KRaftMetadataCache = 
MetadataCache.kRaftMetadataCache(0, () => KRaftVersion.KRAFT_VERSION_0)
 
-  private val quotasNeverThrottleControllerMutations = QuotaManagers(
+  private val quotasNeverThrottleControllerMutations = new QuotaManagers(
     clientQuotaManager,
     clientQuotaManager,
     clientRequestQuotaManager,
@@ -135,9 +135,9 @@ class ControllerApisTest {
     replicaQuotaManager,
     replicaQuotaManager,
     replicaQuotaManager,
-    None)
+    Optional.empty())
 
-  private val quotasAlwaysThrottleControllerMutations = QuotaManagers(
+  private val quotasAlwaysThrottleControllerMutations = new QuotaManagers(
     clientQuotaManager,
     clientQuotaManager,
     clientRequestQuotaManager,
@@ -145,7 +145,7 @@ class ControllerApisTest {
     replicaQuotaManager,
     replicaQuotaManager,
     replicaQuotaManager,
-    None)
+    Optional.empty())
 
   private var controllerApis: ControllerApis = _
 
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 4eea92330c9..768be4786bd 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -18,7 +18,7 @@
 package kafka.server
 
 import java.{lang, util}
-import java.util.{Properties, Map => JMap}
+import java.util.{Optional, Properties, Map => JMap}
 import java.util.concurrent.{CompletionStage, TimeUnit}
 import java.util.concurrent.atomic.AtomicReference
 import kafka.controller.KafkaController
@@ -490,7 +490,7 @@ class DynamicBrokerConfigTest {
     val metrics: Metrics = mock(classOf[Metrics])
     when(kafkaServer.metrics).thenReturn(metrics)
     val quotaManagers: QuotaFactory.QuotaManagers = 
mock(classOf[QuotaFactory.QuotaManagers])
-    when(quotaManagers.clientQuotaCallback).thenReturn(None)
+    when(quotaManagers.clientQuotaCallback).thenReturn(Optional.empty())
     when(kafkaServer.quotaManagers).thenReturn(quotaManagers)
     val socketServer: SocketServer = mock(classOf[SocketServer])
     
when(socketServer.reconfigurableConfigs).thenReturn(SocketServer.ReconfigurableConfigs)
@@ -537,7 +537,7 @@ class DynamicBrokerConfigTest {
     val metrics: Metrics = mock(classOf[Metrics])
     when(controllerServer.metrics).thenReturn(metrics)
     val quotaManagers: QuotaFactory.QuotaManagers = 
mock(classOf[QuotaFactory.QuotaManagers])
-    when(quotaManagers.clientQuotaCallback).thenReturn(None)
+    when(quotaManagers.clientQuotaCallback).thenReturn(Optional.empty())
     when(controllerServer.quotaManagers).thenReturn(quotaManagers)
     val socketServer: SocketServer = mock(classOf[SocketServer])
     
when(socketServer.reconfigurableConfigs).thenReturn(SocketServer.ReconfigurableConfigs)
@@ -583,7 +583,7 @@ class DynamicBrokerConfigTest {
     val metrics: Metrics = mock(classOf[Metrics])
     when(controllerServer.metrics).thenReturn(metrics)
     val quotaManagers: QuotaFactory.QuotaManagers = 
mock(classOf[QuotaFactory.QuotaManagers])
-    when(quotaManagers.clientQuotaCallback).thenReturn(None)
+    when(quotaManagers.clientQuotaCallback).thenReturn(Optional.empty())
     when(controllerServer.quotaManagers).thenReturn(quotaManagers)
     val socketServer: SocketServer = mock(classOf[SocketServer])
     
when(socketServer.reconfigurableConfigs).thenReturn(SocketServer.ReconfigurableConfigs)
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index a0687cbbcec..971cf0595be 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -20,7 +20,6 @@ import kafka.cluster.Partition
 import kafka.integration.KafkaServerTestHarness
 import kafka.log.UnifiedLog
 import kafka.log.remote.RemoteLogManager
-import kafka.server.Constants._
 import kafka.utils.TestUtils.random
 import kafka.utils._
 import kafka.zk.ConfigEntityChangeNotificationZNode
@@ -689,7 +688,7 @@ class DynamicConfigChangeUnitTest {
     val result = configHandler.parseThrottledPartitions(props, 102, 
QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG)
 
     //Then
-    assertEquals(AllReplicas, result)
+    
assertEquals(ReplicationQuotaManager.ALL_REPLICAS.asScala.map(_.toInt).toSeq, 
result)
   }
 
   @Test
@@ -700,7 +699,7 @@ class DynamicConfigChangeUnitTest {
         102, QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG)
     }
     val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, 
null, null)
-    assertEquals(AllReplicas, parse(configHandler, "* "))
+    
assertEquals(ReplicationQuotaManager.ALL_REPLICAS.asScala.map(_.toInt).toSeq, 
parse(configHandler, "* "))
     assertEquals(Seq(), parse(configHandler, " "))
     assertEquals(Seq(6), parse(configHandler, "6:102"))
     assertEquals(Seq(6), parse(configHandler, "6:102 "))
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 3e4644140a3..d4515629454 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -144,8 +144,8 @@ class KafkaApisTest extends Logging {
   private val clientRequestQuotaManager: ClientRequestQuotaManager = 
mock(classOf[ClientRequestQuotaManager])
   private val clientControllerQuotaManager: ControllerMutationQuotaManager = 
mock(classOf[ControllerMutationQuotaManager])
   private val replicaQuotaManager: ReplicationQuotaManager = 
mock(classOf[ReplicationQuotaManager])
-  private val quotas = QuotaManagers(clientQuotaManager, clientQuotaManager, 
clientRequestQuotaManager,
-    clientControllerQuotaManager, replicaQuotaManager, replicaQuotaManager, 
replicaQuotaManager, None)
+  private val quotas = new QuotaManagers(clientQuotaManager, 
clientQuotaManager, clientRequestQuotaManager,
+    clientControllerQuotaManager, replicaQuotaManager, replicaQuotaManager, 
replicaQuotaManager, util.Optional.empty())
   private val fetchManager: FetchManager = mock(classOf[FetchManager])
   private val sharePartitionManager: SharePartitionManager = 
mock(classOf[SharePartitionManager])
   private val clientMetricsManager: ClientMetricsManager = 
mock(classOf[ClientMetricsManager])
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index 47f10f3179e..98436915dbf 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -19,7 +19,7 @@ package kafka.server
 import kafka.cluster.Partition
 import kafka.log.{LogManager, UnifiedLog}
 import kafka.server.AbstractFetcherThread.ResultWithPartitions
-import kafka.server.QuotaFactory.UnboundedQuota
+import kafka.server.QuotaFactory.UNBOUNDED_QUOTA
 import kafka.server.ReplicaAlterLogDirsThread.ReassignmentState
 import kafka.server.metadata.ZkMetadataCache
 import kafka.utils.{DelayedItem, TestUtils}
@@ -531,7 +531,7 @@ class ReplicaAlterLogDirsThreadTest {
     when(replicaManager.fetchMessages(
       params = ArgumentMatchers.eq(expectedFetchParams),
       fetchInfos = ArgumentMatchers.eq(Seq(topicIdPartition -> requestData)),
-      quota = ArgumentMatchers.eq(UnboundedQuota),
+      quota = ArgumentMatchers.eq(UNBOUNDED_QUOTA),
       responseCallback = callbackCaptor.capture(),
     )).thenAnswer(_ => {
       callbackCaptor.getValue.apply(Seq((topicIdPartition, responseData)))
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index fde5a9261bc..a398fc68cf0 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -19,7 +19,7 @@ package kafka.server
 import kafka.cluster.Partition
 import kafka.log.{LogManager, UnifiedLog}
 import kafka.server.AbstractFetcherThread.ResultWithPartitions
-import kafka.server.QuotaFactory.UnboundedQuota
+import kafka.server.QuotaFactory.UNBOUNDED_QUOTA
 import kafka.server.epoch.util.MockBlockingSender
 import kafka.server.metadata.ZkMetadataCache
 import kafka.utils.TestUtils
@@ -332,7 +332,7 @@ class ReplicaFetcherThreadTest {
       config,
       failedPartitions,
       replicaManager,
-      UnboundedQuota,
+      UNBOUNDED_QUOTA,
       mockNetwork
     )
     thread.addPartitions(Map(t1p0 -> initialFetchState(Some(topicId1), 0L), 
t1p1 -> initialFetchState(Some(topicId1), 0L)))
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
index 97370de9f9f..c4eb5e30f5b 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
@@ -263,7 +263,7 @@ class ReplicaManagerConcurrencyTest extends Logging {
       replicaManager.fetchMessages(
         params = fetchParams,
         fetchInfos = Seq(topicIdPartition -> partitionData),
-        quota = QuotaFactory.UnboundedQuota,
+        quota = QuotaFactory.UNBOUNDED_QUOTA,
         responseCallback = fetchCallback,
       )
 
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index ddc9cac8d81..245e0063873 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -24,7 +24,7 @@ import kafka.log.{LogManager, LogManagerTest, UnifiedLog}
 import kafka.log.remote.RemoteLogManager
 import 
org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig.INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS
 import org.apache.kafka.server.log.remote.quota.RLMQuotaMetrics
-import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
+import kafka.server.QuotaFactory.{QuotaManagers, UNBOUNDED_QUOTA}
 import kafka.server.epoch.util.MockBlockingSender
 import kafka.server.share.DelayedShareFetch
 import kafka.utils.TestUtils.waitUntilTrue
@@ -3311,7 +3311,7 @@ class ReplicaManagerTest {
     maxWaitMs: Long = 0,
     minBytes: Int = 1,
     maxBytes: Int = 1024 * 1024,
-    quota: ReplicaQuota = UnboundedQuota,
+    quota: ReplicaQuota = UNBOUNDED_QUOTA,
     isolation: FetchIsolation = FetchIsolation.LOG_END,
     clientMetadata: Option[ClientMetadata] = None
   ): Unit = {
@@ -4031,7 +4031,7 @@ class ReplicaManagerTest {
 
       val params = new FetchParams(ApiKeys.FETCH.latestVersion, replicaId, 1, 
1000, 0, 100, FetchIsolation.LOG_END, None.asJava)
       // when reading log, it'll throw OffsetOutOfRangeException, which will 
be handled separately
-      val result = replicaManager.readFromLog(params, Seq(tidp0 -> new 
PartitionData(topicId, 1, 0, 100000, Optional.of[Integer](leaderEpoch), 
Optional.of[Integer](leaderEpoch))), UnboundedQuota, false)
+      val result = replicaManager.readFromLog(params, Seq(tidp0 -> new 
PartitionData(topicId, 1, 0, 100000, Optional.of[Integer](leaderEpoch), 
Optional.of[Integer](leaderEpoch))), UNBOUNDED_QUOTA, false)
 
       if (isFromFollower) {
         // expect OFFSET_MOVED_TO_TIERED_STORAGE error returned if it's from 
follower, since the data is already available in remote log
@@ -4099,7 +4099,7 @@ class ReplicaManagerTest {
       }
 
       // when reading log, it'll throw OffsetOutOfRangeException, which will 
be handled separately
-      replicaManager.fetchMessages(params, Seq(tidp0 -> new 
PartitionData(topicId, fetchOffset, 0, 100000, 
Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), 
UnboundedQuota, fetchCallback)
+      replicaManager.fetchMessages(params, Seq(tidp0 -> new 
PartitionData(topicId, fetchOffset, 0, 100000, 
Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), 
UNBOUNDED_QUOTA, fetchCallback)
 
       val remoteStorageFetchInfoArg: ArgumentCaptor[RemoteStorageFetchInfo] = 
ArgumentCaptor.forClass(classOf[RemoteStorageFetchInfo])
       if (isFromFollower) {
@@ -4198,7 +4198,7 @@ class ReplicaManagerTest {
 
       // create 5 asyncRead tasks, which should enqueue 3 task
       for (i <- 1 to 5)
-        replicaManager.fetchMessages(params, Seq(tidp0 -> new 
PartitionData(topicId, fetchOffset, 0, 100000, 
Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), 
UnboundedQuota, fetchCallback)
+        replicaManager.fetchMessages(params, Seq(tidp0 -> new 
PartitionData(topicId, fetchOffset, 0, 100000, 
Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), 
UNBOUNDED_QUOTA, fetchCallback)
 
       // wait until at least 2 task submitted to use all the available threads
       queueLatch.await(5000, TimeUnit.MILLISECONDS)
@@ -4308,7 +4308,7 @@ class ReplicaManagerTest {
       }).when(spyRLM).read(any())
 
       val curExpiresPerSec = 
DelayedRemoteFetchMetrics.expiredRequestMeter.count()
-      replicaManager.fetchMessages(params, Seq(tidp0 -> new 
PartitionData(topicId, fetchOffset, 0, 100000, 
Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), 
UnboundedQuota, fetchCallback)
+      replicaManager.fetchMessages(params, Seq(tidp0 -> new 
PartitionData(topicId, fetchOffset, 0, 100000, 
Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), 
UNBOUNDED_QUOTA, fetchCallback)
       // advancing the clock to expire the delayed remote fetch
       timer.advanceClock(2000L)
 
@@ -6823,7 +6823,7 @@ class ReplicaManagerTest {
       replicaManager.readFromLog(
         params,
         Seq(new TopicIdPartition(topicId, 0, topic) -> new 
PartitionData(topicId, 1, 0, 100000, Optional.of[Integer](leaderEpoch), 
Optional.of(leaderEpoch))),
-        UnboundedQuota,
+        UNBOUNDED_QUOTA,
         readFromPurgatory = false)
     } finally {
       replicaManager.shutdown(checkpointHW = false)
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
index ff90883a4a4..acb5c22ec76 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
@@ -39,7 +39,7 @@ class ReplicationQuotaManagerTest {
   @Test
   def shouldThrottleOnlyDefinedReplicas(): Unit = {
     val quota = new ReplicationQuotaManager(new 
ReplicationQuotaManagerConfig(), metrics, QuotaType.FETCH, time)
-    quota.markThrottled("topic1", Seq(1, 2, 3))
+    quota.markThrottled("topic1", java.util.List.of(1, 2, 3))
 
     assertTrue(quota.isThrottled(tp1(1)))
     assertTrue(quota.isThrottled(tp1(2)))
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
index e0c931eeef8..a6463cfd443 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
@@ -116,7 +116,7 @@ public class KRaftMetadataRequestBenchmark {
     private final ReplicationQuotaManager replicaQuotaManager = 
Mockito.mock(ReplicationQuotaManager.class);
     private final QuotaFactory.QuotaManagers quotaManagers = new 
QuotaFactory.QuotaManagers(clientQuotaManager,
             clientQuotaManager, clientRequestQuotaManager, 
controllerMutationQuotaManager, replicaQuotaManager,
-            replicaQuotaManager, replicaQuotaManager, Option.empty());
+            replicaQuotaManager, replicaQuotaManager, Optional.empty());
     private final FetchManager fetchManager = Mockito.mock(FetchManager.class);
     private final BrokerTopicStats brokerTopicStats = new 
BrokerTopicStats(false);
     private final KafkaPrincipal principal = new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user");

Reply via email to