This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2f086d188fe KAFKA-18892: Add KIP-877 support for ClientQuotaCallback 
(#19068)
2f086d188fe is described below

commit 2f086d188fe42db290d7037e83f552075e8bfcaa
Author: Ken Huang <[email protected]>
AuthorDate: Tue Apr 8 22:58:29 2025 +0800

    KAFKA-18892: Add KIP-877 support for ClientQuotaCallback (#19068)
    
    Allow ClientQuotaCallback to implement Monitorable and register metrics.
    
    Reviewers: Mickael Maison <[email protected]>, TaiJuWu
    <[email protected]>, Jhen-Yung Hsu <[email protected]>
---
 .../server/quota/CustomQuotaCallbackTest.java      | 91 ++++++++++++++++++----
 .../org/apache/kafka/common/internals/Plugin.java  |  6 ++
 .../kafka/server/quota/ClientQuotaCallback.java    |  7 ++
 .../kafka/server/ClientRequestQuotaManager.java    | 10 ++-
 core/src/main/java/kafka/server/QuotaFactory.java  | 55 +++++++++----
 .../src/main/scala/kafka/server/BrokerServer.scala |  4 +-
 .../scala/kafka/server/ClientQuotaManager.scala    | 13 +++-
 .../server/ControllerMutationQuotaManager.scala    |  3 +-
 .../main/scala/kafka/server/ControllerServer.scala |  3 +-
 .../scala/kafka/server/DynamicBrokerConfig.scala   | 26 ++++---
 .../DynamicTopicClusterQuotaPublisher.scala        |  4 +-
 .../kafka/server/KRaftClusterTest.scala            |  4 +-
 .../kafka/server/LocalLeaderEndPointTest.scala     |  2 +-
 .../kafka/server/DynamicBrokerConfigTest.scala     |  6 +-
 .../server/HighwatermarkPersistenceTest.scala      |  4 +-
 .../unit/kafka/server/IsrExpirationTest.scala      |  2 +-
 .../server/ReplicaManagerConcurrencyTest.scala     |  2 +-
 .../kafka/server/ReplicaManagerQuotasTest.scala    |  2 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     |  2 +-
 .../server/epoch/OffsetsForLeaderEpochTest.scala   |  2 +-
 .../apache/kafka/jmh/server/CheckpointBench.java   |  2 +-
 .../kafka/jmh/server/PartitionCreationBench.java   |  2 +-
 22 files changed, 183 insertions(+), 69 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/server/quota/CustomQuotaCallbackTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/server/quota/CustomQuotaCallbackTest.java
index e602ae081d7..da42c7a3007 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/server/quota/CustomQuotaCallbackTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/server/quota/CustomQuotaCallbackTest.java
@@ -19,39 +19,44 @@ package org.apache.kafka.server.quota;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Monitorable;
+import org.apache.kafka.common.metrics.PluginMetrics;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.test.ClusterInstance;
 import org.apache.kafka.common.test.TestUtils;
 import org.apache.kafka.common.test.api.ClusterConfigProperty;
 import org.apache.kafka.common.test.api.ClusterTest;
-import org.apache.kafka.common.test.api.ClusterTestDefaults;
 import org.apache.kafka.common.test.api.Type;
 import org.apache.kafka.server.config.QuotaConfig;
 
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
-@ClusterTestDefaults(controllers = 3, 
-    types = {Type.KRAFT},
-    serverProperties = {
-        @ClusterConfigProperty(id = 3000, key = 
QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = 
"org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"),
-        @ClusterConfigProperty(id = 3001, key = 
QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = 
"org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"),
-        @ClusterConfigProperty(id = 3002, key = 
QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = 
"org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"),
-    }
-)
-public class CustomQuotaCallbackTest {
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
-    private final ClusterInstance cluster;
+public class CustomQuotaCallbackTest {
 
-    public CustomQuotaCallbackTest(ClusterInstance clusterInstance) {
-        this.cluster = clusterInstance;
+    private static int controllerId(Type type) {
+        return type == Type.KRAFT ? 3000 : 0;
     }
 
-    @ClusterTest
-    public void testCustomQuotaCallbackWithControllerServer() throws 
InterruptedException {
-        
+    @ClusterTest(
+        controllers = 3,
+        types = {Type.KRAFT},
+        serverProperties = {
+            @ClusterConfigProperty(id = 3000, key = 
QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = 
"org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"),
+            @ClusterConfigProperty(id = 3001, key = 
QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = 
"org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"),
+            @ClusterConfigProperty(id = 3002, key = 
QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = 
"org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"),
+        }
+    )
+    public void testCustomQuotaCallbackWithControllerServer(ClusterInstance 
cluster) throws InterruptedException {
+
         try (Admin admin = cluster.admin(Map.of())) {
             admin.createTopics(List.of(new NewTopic("topic", 1, (short) 1)));
             TestUtils.waitForCondition(
@@ -69,10 +74,49 @@ public class CustomQuotaCallbackTest {
                         && 
CustomQuotaCallback.COUNTERS.values().stream().allMatch(counter -> 
counter.get() > 0), 
                     "The CustomQuotaCallback not triggered in all controllers. 
"
             );
-            
+        
+        }
+    }
+
+    @ClusterTest(
+        types = {Type.CO_KRAFT, Type.KRAFT},
+        serverProperties = {
+            @ClusterConfigProperty(key = 
QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = 
"org.apache.kafka.server.quota.CustomQuotaCallbackTest$MonitorableCustomQuotaCallback"),
         }
+    )
+    public void 
testMonitorableCustomQuotaCallbackWithCombinedMode(ClusterInstance cluster) {
+        assertMetrics(
+            cluster.brokers().get(0).metrics(),
+            expectedTags(Map.of("role", "broker"))
+        );
+        assertMetrics(
+            cluster.controllers().get(controllerId(cluster.type())).metrics(),
+            expectedTags(Map.of("role", "controller"))
+        );
     }
 
+    private void assertMetrics(Metrics metrics, Map<String, String> 
expectedTags) {
+        int found = 0;
+        for (MetricName metricName : metrics.metrics().keySet()) {
+            if (metricName.group().equals("plugins")) {
+                Map<String, String> tags = metricName.tags();
+                if (expectedTags.equals(tags)) {
+                    assertEquals(MonitorableCustomQuotaCallback.METRIC_NAME, 
metricName.name());
+                    
assertEquals(MonitorableCustomQuotaCallback.METRIC_DESCRIPTION, 
metricName.description());
+                    found++;
+                }
+            }
+        }
+        assertEquals(1, found);
+    }
+
+    private static Map<String, String> expectedTags(Map<String, String> 
extraTags) {
+        Map<String, String> tags = new LinkedHashMap<>();
+        tags.put("config", QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG);
+        tags.put("class", 
MonitorableCustomQuotaCallback.class.getSimpleName());
+        tags.putAll(extraTags);
+        return tags;
+    }
 
     public static class CustomQuotaCallback implements ClientQuotaCallback {
 
@@ -121,4 +165,17 @@ public class CustomQuotaCallbackTest {
         }
 
     }
+
+    public static class MonitorableCustomQuotaCallback extends 
CustomQuotaCallback implements Monitorable {
+        
+        private static final String METRIC_NAME = 
"monitorable-custom-quota-callback-name";
+        private static final String METRIC_DESCRIPTION = 
"monitorable-custom-quota-callback-description";
+
+        @Override
+        public void withPluginMetrics(PluginMetrics metrics) {
+            MetricName metricName = metrics.metricName(METRIC_NAME, 
METRIC_DESCRIPTION, Map.of());
+            metrics.addMetric(metricName, (Gauge<Integer>) (config, now) -> 1);
+        }
+        
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/internals/Plugin.java 
b/clients/src/main/java/org/apache/kafka/common/internals/Plugin.java
index 620cd0c07ec..587f9b74a8d 100644
--- a/clients/src/main/java/org/apache/kafka/common/internals/Plugin.java
+++ b/clients/src/main/java/org/apache/kafka/common/internals/Plugin.java
@@ -44,6 +44,12 @@ public class Plugin<T> implements Supplier<T>, AutoCloseable 
{
         return wrapInstance(instance, metrics, () -> tags(key, instance));
     }
 
+    public static <T> Plugin<T> wrapInstance(T instance, Metrics metrics, 
String key, Map<String, String> extraTags) {
+        Map<String, String> tags = tags(key, instance);
+        tags.putAll(extraTags);
+        return wrapInstance(instance, metrics, () -> tags);
+    }
+
     private static <T> Map<String, String> tags(String key, T instance) {
         Map<String, String> tags = new LinkedHashMap<>();
         tags.put("config", key);
diff --git 
a/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaCallback.java 
b/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaCallback.java
index a9cb2bfb2af..01a8181d861 100644
--- 
a/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaCallback.java
+++ 
b/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaCallback.java
@@ -24,6 +24,13 @@ import java.util.Map;
 
 /**
  * Quota callback interface for brokers and controllers that enables 
customization of client quota computation.
+ * Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the 
callback to register metrics. 
+ * The following tags are automatically added to all metrics registered: 
+ * <ul>
+ *     <li><code>config</code> set to 
<code>client.quota.callback.class</code></li>
+ *     <li><code>class</code> set to the ClientQuotaCallback class name</li>
+ *     <li><code>role</code> set to broker/controller, which indicates the 
role of the server</li>
+ * </ul>
  */
 public interface ClientQuotaCallback extends Configurable {
 
diff --git a/core/src/main/java/kafka/server/ClientRequestQuotaManager.java 
b/core/src/main/java/kafka/server/ClientRequestQuotaManager.java
index f93c50b2783..1d069587987 100644
--- a/core/src/main/java/kafka/server/ClientRequestQuotaManager.java
+++ b/core/src/main/java/kafka/server/ClientRequestQuotaManager.java
@@ -19,6 +19,7 @@ package kafka.server;
 import kafka.network.RequestChannel;
 
 import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.internals.Plugin;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.QuotaViolationException;
 import org.apache.kafka.common.metrics.Sensor;
@@ -49,8 +50,13 @@ public class ClientRequestQuotaManager extends 
ClientQuotaManager {
     // Visible for testing
     private final Sensor exemptSensor;
 
-    public ClientRequestQuotaManager(ClientQuotaManagerConfig config, Metrics 
metrics, Time time, String threadNamePrefix, Optional<ClientQuotaCallback> 
quotaCallback) {
-        super(config, metrics, QuotaType.REQUEST, time, threadNamePrefix, 
OptionConverters.toScala(quotaCallback));
+    public ClientRequestQuotaManager(
+            ClientQuotaManagerConfig config, 
+            Metrics metrics, Time time, 
+            String threadNamePrefix, 
+            Optional<Plugin<ClientQuotaCallback>> quotaCallbackPlugin
+    ) {
+        super(config, metrics, QuotaType.REQUEST, time, threadNamePrefix, 
OptionConverters.toScala(quotaCallbackPlugin));
         this.maxThrottleTimeMs = 
TimeUnit.SECONDS.toMillis(config.quotaWindowSizeSeconds);
         this.metrics = metrics;
         this.exemptMetricName = metrics.metricName("exempt-request-time", 
QuotaType.REQUEST.toString(), "Tracking exempt-request-time utilization 
percentage");
diff --git a/core/src/main/java/kafka/server/QuotaFactory.java 
b/core/src/main/java/kafka/server/QuotaFactory.java
index 57e9139e285..de9c8470789 100644
--- a/core/src/main/java/kafka/server/QuotaFactory.java
+++ b/core/src/main/java/kafka/server/QuotaFactory.java
@@ -17,17 +17,21 @@
 package kafka.server;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.Plugin;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.server.config.ClientQuotaManagerConfig;
 import org.apache.kafka.server.config.QuotaConfig;
 import org.apache.kafka.server.config.ReplicationQuotaManagerConfig;
 import org.apache.kafka.server.quota.ClientQuotaCallback;
 import org.apache.kafka.server.quota.QuotaType;
 
+import java.util.Map;
 import java.util.Optional;
 
 import scala.Option;
+import scala.jdk.javaapi.OptionConverters;
 
 public class QuotaFactory {
 
@@ -56,12 +60,12 @@ public class QuotaFactory {
         private final ReplicationQuotaManager leader;
         private final ReplicationQuotaManager follower;
         private final ReplicationQuotaManager alterLogDirs;
-        private final Optional<ClientQuotaCallback> clientQuotaCallback;
+        private final Optional<Plugin<ClientQuotaCallback>> 
clientQuotaCallbackPlugin;
 
         public QuotaManagers(ClientQuotaManager fetch, ClientQuotaManager 
produce, ClientRequestQuotaManager request,
                              ControllerMutationQuotaManager 
controllerMutation, ReplicationQuotaManager leader,
                              ReplicationQuotaManager follower, 
ReplicationQuotaManager alterLogDirs,
-                             Optional<ClientQuotaCallback> 
clientQuotaCallback) {
+                             Optional<Plugin<ClientQuotaCallback>> 
clientQuotaCallbackPlugin) {
             this.fetch = fetch;
             this.produce = produce;
             this.request = request;
@@ -69,7 +73,7 @@ public class QuotaFactory {
             this.leader = leader;
             this.follower = follower;
             this.alterLogDirs = alterLogDirs;
-            this.clientQuotaCallback = clientQuotaCallback;
+            this.clientQuotaCallbackPlugin = clientQuotaCallbackPlugin;
         }
 
         public ClientQuotaManager fetch() {
@@ -100,8 +104,8 @@ public class QuotaFactory {
             return alterLogDirs;
         }
 
-        public Optional<ClientQuotaCallback> clientQuotaCallback() {
-            return clientQuotaCallback;
+        public Optional<Plugin<ClientQuotaCallback>> 
clientQuotaCallbackPlugin() {
+            return clientQuotaCallbackPlugin;
         }
 
         public void shutdown() {
@@ -109,26 +113,47 @@ public class QuotaFactory {
             produce.shutdown();
             request.shutdown();
             controllerMutation.shutdown();
-            clientQuotaCallback.ifPresent(ClientQuotaCallback::close);
+            clientQuotaCallbackPlugin.ifPresent(plugin -> 
Utils.closeQuietly(plugin, "client quota callback plugin"));
         }
     }
 
-    public static QuotaManagers instantiate(KafkaConfig cfg, Metrics metrics, 
Time time, String threadNamePrefix) {
-        ClientQuotaCallback clientQuotaCallback = cfg.getConfiguredInstance(
-            QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, 
ClientQuotaCallback.class);
+    public static QuotaManagers instantiate(
+        KafkaConfig cfg,
+        Metrics metrics,
+        Time time,
+        String threadNamePrefix,
+        String role
+    ) {
+        Optional<Plugin<ClientQuotaCallback>> clientQuotaCallbackPlugin = 
createClientQuotaCallback(cfg, metrics, role);
+        Option<Plugin<ClientQuotaCallback>> clientQuotaCallbackPluginOption = 
OptionConverters.toScala(clientQuotaCallbackPlugin);
 
         return new QuotaManagers(
-            new ClientQuotaManager(clientConfig(cfg), metrics, 
QuotaType.FETCH, time, threadNamePrefix, Option.apply(clientQuotaCallback)),
-            new ClientQuotaManager(clientConfig(cfg), metrics, 
QuotaType.PRODUCE, time, threadNamePrefix, Option.apply(clientQuotaCallback)),
-            new ClientRequestQuotaManager(clientConfig(cfg), metrics, time, 
threadNamePrefix, Optional.ofNullable(clientQuotaCallback)),
-            new 
ControllerMutationQuotaManager(clientControllerMutationConfig(cfg), metrics, 
time, threadNamePrefix, Option.apply(clientQuotaCallback)),
+            new ClientQuotaManager(clientConfig(cfg), metrics, 
QuotaType.FETCH, time, threadNamePrefix, clientQuotaCallbackPluginOption),
+            new ClientQuotaManager(clientConfig(cfg), metrics, 
QuotaType.PRODUCE, time, threadNamePrefix, clientQuotaCallbackPluginOption),
+            new ClientRequestQuotaManager(clientConfig(cfg), metrics, time, 
threadNamePrefix, clientQuotaCallbackPlugin),
+            new 
ControllerMutationQuotaManager(clientControllerMutationConfig(cfg), metrics, 
time, threadNamePrefix, clientQuotaCallbackPluginOption),
             new ReplicationQuotaManager(replicationConfig(cfg), metrics, 
QuotaType.LEADER_REPLICATION, time),
             new ReplicationQuotaManager(replicationConfig(cfg), metrics, 
QuotaType.FOLLOWER_REPLICATION, time),
             new ReplicationQuotaManager(alterLogDirsReplicationConfig(cfg), 
metrics, QuotaType.ALTER_LOG_DIRS_REPLICATION, time),
-            Optional.ofNullable(clientQuotaCallback)
+            clientQuotaCallbackPlugin
         );
     }
 
+    private static Optional<Plugin<ClientQuotaCallback>> 
createClientQuotaCallback(
+        KafkaConfig cfg, 
+        Metrics metrics, 
+        String role
+    ) {
+        ClientQuotaCallback clientQuotaCallback = cfg.getConfiguredInstance(
+            QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, 
ClientQuotaCallback.class);
+        return clientQuotaCallback == null ? Optional.empty() : 
Optional.of(Plugin.wrapInstance(
+            clientQuotaCallback,
+            metrics,
+            QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG,
+            Map.of("role", role)
+        ));
+    }
+
     private static ClientQuotaManagerConfig clientConfig(KafkaConfig cfg) {
         return new ClientQuotaManagerConfig(
             cfg.quotaConfig().numQuotaSamples(),
@@ -156,4 +181,4 @@ public class QuotaFactory {
             cfg.quotaConfig().alterLogDirsReplicationQuotaWindowSizeSeconds()
         );
     }
-}
\ No newline at end of file
+}
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 0823ec683f7..31ebc14c960 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -54,7 +54,7 @@ import 
org.apache.kafka.server.share.persister.{DefaultStatePersister, NoOpState
 import org.apache.kafka.server.share.session.ShareSessionCache
 import org.apache.kafka.server.util.timer.{SystemTimer, SystemTimerReaper}
 import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler}
-import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures, 
ClientMetricsManager, DelayedActionQueue}
+import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures, 
ClientMetricsManager, DelayedActionQueue, ProcessRole}
 import org.apache.kafka.storage.internals.log.LogDirFailureChannel
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 
@@ -196,7 +196,7 @@ class BrokerServer(
       val clientMetricsReceiverPlugin = new ClientMetricsReceiverPlugin()
 
       config.dynamicConfig.initialize(Some(clientMetricsReceiverPlugin))
-      quotaManagers = QuotaFactory.instantiate(config, metrics, time, 
s"broker-${config.nodeId}-")
+      quotaManagers = QuotaFactory.instantiate(config, metrics, time, 
s"broker-${config.nodeId}-", ProcessRole.BrokerRole.toString)
       DynamicBrokerConfig.readDynamicBrokerConfigsFromSnapshot(raftManager, 
config, quotaManagers)
 
       /* start scheduler */
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala 
b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index d8346e6ab85..894e410fc63 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -23,6 +23,7 @@ import java.util.function.Consumer
 import kafka.network.RequestChannel
 import kafka.server.ClientQuotaManager._
 import kafka.utils.Logging
+import org.apache.kafka.common.internals.Plugin
 import org.apache.kafka.common.{Cluster, MetricName}
 import org.apache.kafka.common.metrics._
 import org.apache.kafka.common.metrics.Metrics
@@ -137,22 +138,26 @@ object ClientQuotaManager {
  * @param quotaType Quota type of this quota manager
  * @param time @Time object to use
  * @param threadNamePrefix The thread prefix to use
- * @param clientQuotaCallback An optional @ClientQuotaCallback
+ * @param clientQuotaCallbackPlugin An optional @ClientQuotaCallback and 
+ *                                  wrap it in a {@link 
org.apache.kafka.common.internals.Plugin}
  */
 class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
                          private val metrics: Metrics,
                          private val quotaType: QuotaType,
                          private val time: Time,
                          private val threadNamePrefix: String,
-                         private val clientQuotaCallback: 
Option[ClientQuotaCallback] = None) extends Logging {
+                         private val clientQuotaCallbackPlugin: 
Option[Plugin[ClientQuotaCallback]] = None) extends Logging {
 
   private val lock = new ReentrantReadWriteLock()
   private val sensorAccessor = new SensorAccess(lock, metrics)
-  private val quotaCallback = clientQuotaCallback.getOrElse(new 
DefaultQuotaCallback)
+  private val quotaCallback = clientQuotaCallbackPlugin match {
+    case Some(plugin) => plugin.get()
+    case None => new DefaultQuotaCallback
+  }
   private val clientQuotaType = QuotaType.toClientQuotaType(quotaType)
 
   @volatile
-  private var quotaTypesEnabled = clientQuotaCallback match {
+  private var quotaTypesEnabled = clientQuotaCallbackPlugin match {
     case Some(_) => QuotaTypes.CustomQuotas
     case None => QuotaTypes.NoQuotas
   }
diff --git 
a/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala 
b/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
index cc3a8892217..bfb9feedbf7 100644
--- a/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
@@ -19,6 +19,7 @@ package kafka.server
 import kafka.network.RequestChannel
 import org.apache.kafka.common.MetricName
 import org.apache.kafka.common.errors.ThrottlingQuotaExceededException
+import org.apache.kafka.common.internals.Plugin
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.metrics.QuotaViolationException
 import org.apache.kafka.common.metrics.Sensor
@@ -165,7 +166,7 @@ class ControllerMutationQuotaManager(private val config: 
ClientQuotaManagerConfi
                                      private val metrics: Metrics,
                                      private val time: Time,
                                      private val threadNamePrefix: String,
-                                     private val quotaCallback: 
Option[ClientQuotaCallback])
+                                     private val quotaCallback: 
Option[Plugin[ClientQuotaCallback]])
     extends ClientQuotaManager(config, metrics, QuotaType.CONTROLLER_MUTATION, 
time, threadNamePrefix, quotaCallback) {
 
   override protected def clientQuotaMetricName(quotaMetricTags: Map[String, 
String]): MetricName = {
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala 
b/core/src/main/scala/kafka/server/ControllerServer.scala
index f79e4a9e41d..e7537878ca6 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -40,6 +40,7 @@ import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
 import org.apache.kafka.metadata.publisher.{AclPublisher, FeaturesPublisher}
 import org.apache.kafka.raft.QuorumConfig
 import org.apache.kafka.security.CredentialProvider
+import org.apache.kafka.server.ProcessRole
 import org.apache.kafka.server.authorizer.Authorizer
 import 
org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG,
 CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG}
 import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion, 
NodeToControllerChannelManager}
@@ -266,7 +267,7 @@ class ControllerServer(
       quotaManagers = QuotaFactory.instantiate(config,
         metrics,
         time,
-        s"controller-${config.nodeId}-")
+        s"controller-${config.nodeId}-", ProcessRole.ControllerRole.toString)
       clientQuotaMetadataManager = new 
ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas)
       controllerApis = new ControllerApis(socketServer.dataPlaneRequestChannel,
         authorizer,
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 6c4066c9db2..6db9e0a6d0a 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -920,25 +920,31 @@ class DynamicClientQuotaCallback(
 
   override def reconfigurableConfigs(): util.Set[String] = {
     val configs = new util.HashSet[String]()
-    quotaManagers.clientQuotaCallback.ifPresent {
-      case callback: Reconfigurable => 
configs.addAll(callback.reconfigurableConfigs)
-      case _ =>
+    quotaManagers.clientQuotaCallbackPlugin.ifPresent { plugin =>
+      plugin.get() match {
+        case callback: Reconfigurable => 
configs.addAll(callback.reconfigurableConfigs)
+        case _ =>
+      }
     }
     configs
   }
 
   override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
-    quotaManagers.clientQuotaCallback.ifPresent {
-      case callback: Reconfigurable => 
callback.validateReconfiguration(configs)
-      case _ =>
+    quotaManagers.clientQuotaCallbackPlugin.ifPresent { plugin =>
+      plugin.get() match {
+        case callback: Reconfigurable => 
callback.validateReconfiguration(configs)
+        case _ =>
+      }
     }
   }
 
   override def reconfigure(configs: util.Map[String, _]): Unit = {
-    quotaManagers.clientQuotaCallback.ifPresent {
-      case callback: Reconfigurable =>
-        serverConfig.dynamicConfig.maybeReconfigure(callback, 
serverConfig.dynamicConfig.currentKafkaConfig, configs)
-      case _ =>
+    quotaManagers.clientQuotaCallbackPlugin.ifPresent { plugin =>
+      plugin.get() match {
+        case callback: Reconfigurable =>
+          serverConfig.dynamicConfig.maybeReconfigure(callback, 
serverConfig.dynamicConfig.currentKafkaConfig, configs)
+        case _ =>
+      }
     }
   }
 }
diff --git 
a/core/src/main/scala/kafka/server/metadata/DynamicTopicClusterQuotaPublisher.scala
 
b/core/src/main/scala/kafka/server/metadata/DynamicTopicClusterQuotaPublisher.scala
index 64e21940447..7798c18b4d6 100644
--- 
a/core/src/main/scala/kafka/server/metadata/DynamicTopicClusterQuotaPublisher.scala
+++ 
b/core/src/main/scala/kafka/server/metadata/DynamicTopicClusterQuotaPublisher.scala
@@ -50,10 +50,10 @@ class DynamicTopicClusterQuotaPublisher (
     newImage: MetadataImage,
   ): Unit = {
     try {
-      quotaManagers.clientQuotaCallback().ifPresent(clientQuotaCallback => {
+      quotaManagers.clientQuotaCallbackPlugin().ifPresent(plugin => {
         if (delta.topicsDelta() != null || delta.clusterDelta() != null) {
           val cluster = MetadataCache.toCluster(clusterId, newImage)
-          if (clientQuotaCallback.updateClusterMetadata(cluster)) {
+          if (plugin.get().updateClusterMetadata(cluster)) {
             quotaManagers.fetch.updateQuotaMetricConfigs()
             quotaManagers.produce.updateQuotaMetricConfigs()
             quotaManagers.request.updateQuotaMetricConfigs()
diff --git 
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala 
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index d51a7eb325d..ea9f036e7f4 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -1188,9 +1188,9 @@ class KRaftClusterTest {
     def assertConfigValue(expected: Int): Unit = {
       TestUtils.retry(60000) {
         assertEquals(expected, 
cluster.controllers().values().iterator().next().
-          
quotaManagers.clientQuotaCallback.get.asInstanceOf[DummyClientQuotaCallback].value)
+          
quotaManagers.clientQuotaCallbackPlugin.get.get.asInstanceOf[DummyClientQuotaCallback].value)
         assertEquals(expected, cluster.brokers().values().iterator().next().
-          
quotaManagers.clientQuotaCallback.get.asInstanceOf[DummyClientQuotaCallback].value)
+          
quotaManagers.clientQuotaCallbackPlugin.get.get.asInstanceOf[DummyClientQuotaCallback].value)
       }
     }
 
diff --git a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala 
b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
index 64cf66db3ad..ae9cdc36b6b 100644
--- a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
+++ b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
@@ -62,7 +62,7 @@ class LocalLeaderEndPointTest extends Logging {
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new 
File(_)))
     val alterPartitionManager = mock(classOf[AlterPartitionManager])
     val metrics = new Metrics
-    quotaManager = QuotaFactory.instantiate(config, metrics, time, "")
+    quotaManager = QuotaFactory.instantiate(config, metrics, time, "", "")
     replicaManager = new ReplicaManager(
       metrics = metrics,
       config = config,
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 816752b2706..8030c5060dd 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -469,7 +469,7 @@ class DynamicBrokerConfigTest {
     val metrics: Metrics = mock(classOf[Metrics])
     when(kafkaServer.metrics).thenReturn(metrics)
     val quotaManagers: QuotaFactory.QuotaManagers = 
mock(classOf[QuotaFactory.QuotaManagers])
-    when(quotaManagers.clientQuotaCallback).thenReturn(Optional.empty())
+    when(quotaManagers.clientQuotaCallbackPlugin).thenReturn(Optional.empty())
     when(kafkaServer.quotaManagers).thenReturn(quotaManagers)
     val socketServer: SocketServer = mock(classOf[SocketServer])
     
when(socketServer.reconfigurableConfigs).thenReturn(SocketServer.ReconfigurableConfigs)
@@ -515,7 +515,7 @@ class DynamicBrokerConfigTest {
     val metrics: Metrics = mock(classOf[Metrics])
     when(controllerServer.metrics).thenReturn(metrics)
     val quotaManagers: QuotaFactory.QuotaManagers = 
mock(classOf[QuotaFactory.QuotaManagers])
-    when(quotaManagers.clientQuotaCallback).thenReturn(Optional.empty())
+    when(quotaManagers.clientQuotaCallbackPlugin).thenReturn(Optional.empty())
     when(controllerServer.quotaManagers).thenReturn(quotaManagers)
     val socketServer: SocketServer = mock(classOf[SocketServer])
     
when(socketServer.reconfigurableConfigs).thenReturn(SocketServer.ReconfigurableConfigs)
@@ -560,7 +560,7 @@ class DynamicBrokerConfigTest {
     val metrics: Metrics = mock(classOf[Metrics])
     when(controllerServer.metrics).thenReturn(metrics)
     val quotaManagers: QuotaFactory.QuotaManagers = 
mock(classOf[QuotaFactory.QuotaManagers])
-    when(quotaManagers.clientQuotaCallback).thenReturn(Optional.empty())
+    when(quotaManagers.clientQuotaCallbackPlugin).thenReturn(Optional.empty())
     when(controllerServer.quotaManagers).thenReturn(quotaManagers)
     val socketServer: SocketServer = mock(classOf[SocketServer])
     
when(socketServer.reconfigurableConfigs).thenReturn(SocketServer.ReconfigurableConfigs)
diff --git 
a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index 5d3177adea5..378c38e530f 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -64,7 +64,7 @@ class HighwatermarkPersistenceTest {
     scheduler.startup()
     val metrics = new Metrics
     val time = new MockTime
-    val quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, 
"")
+    val quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, 
"", "")
     // create replica manager
     val replicaManager = new ReplicaManager(
       metrics = metrics,
@@ -122,7 +122,7 @@ class HighwatermarkPersistenceTest {
     scheduler.startup()
     val metrics = new Metrics
     val time = new MockTime
-    val quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, 
"")
+    val quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, 
"", "")
     // create replica manager
     val replicaManager = new ReplicaManager(
       metrics = metrics,
diff --git a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala 
b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
index 9d8836684ab..5ebf32accac 100644
--- a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
@@ -66,7 +66,7 @@ class IsrExpirationTest {
     when(logManager.liveLogDirs).thenReturn(Array.empty[File])
 
     alterIsrManager = TestUtils.createAlterIsrManager()
-    quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "")
+    quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "", 
"")
     replicaManager = new ReplicaManager(
       metrics = metrics,
       config = configs.head,
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
index 6824f92d3dc..23ff3b71e97 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
@@ -183,7 +183,7 @@ class ReplicaManagerConcurrencyTest extends Logging {
       time = time
     )
 
-    quotaManagers = QuotaFactory.instantiate(config, metrics, time, "")
+    quotaManagers = QuotaFactory.instantiate(config, metrics, time, "", "")
 
     new ReplicaManager(
       metrics = metrics,
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index f4fb346be63..a7948ae901f 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -298,7 +298,7 @@ class ReplicaManagerQuotasTest {
     val alterIsrManager: AlterPartitionManager = 
mock(classOf[AlterPartitionManager])
 
     val leaderBrokerId = configs.head.brokerId
-    quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "")
+    quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "", 
"")
     replicaManager = new ReplicaManager(
       metrics = metrics,
       config = configs.head,
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index a8e760c7b7a..61c9b681cd8 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -139,7 +139,7 @@ class ReplicaManagerTest {
     val props = TestUtils.createBrokerConfig(1)
     config = KafkaConfig.fromProps(props)
     alterPartitionManager = mock(classOf[AlterPartitionManager])
-    quotaManager = QuotaFactory.instantiate(config, metrics, time, "")
+    quotaManager = QuotaFactory.instantiate(config, metrics, time, "", "")
     mockRemoteLogManager = mock(classOf[RemoteLogManager])
     when(mockRemoteLogManager.fetchThrottleTimeSensor()).thenReturn(
       new RLMQuotaMetrics(metrics,
diff --git 
a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala 
b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
index 05ab66ec5fb..1e297fc33c9 100644
--- 
a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
@@ -50,7 +50,7 @@ class OffsetsForLeaderEpochTest {
 
   @BeforeEach
   def setUp(): Unit = {
-    quotaManager = QuotaFactory.instantiate(config, metrics, time, "")
+    quotaManager = QuotaFactory.instantiate(config, metrics, time, "", "")
   }
 
   @Test
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
index 05532b7ea14..2db7f2bb1c2 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
@@ -120,7 +120,7 @@ public class CheckpointBench {
         this.quotaManagers =
                 QuotaFactory.instantiate(this.brokerProperties,
                         this.metrics,
-                        this.time, "");
+                        this.time, "", "");
 
         this.alterPartitionManager = TestUtils.createAlterIsrManager();
         this.replicaManager = new ReplicaManagerBuilder().
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
index 1a82b6c110b..4d544926afa 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
@@ -141,7 +141,7 @@ public class PartitionCreationBench {
             setTime(Time.SYSTEM).
             build();
         scheduler.startup();
-        this.quotaManagers = QuotaFactory.instantiate(this.brokerProperties, 
this.metrics, this.time, "");
+        this.quotaManagers = QuotaFactory.instantiate(this.brokerProperties, 
this.metrics, this.time, "", "");
         this.alterPartitionManager = TestUtils.createAlterIsrManager();
         this.replicaManager = new ReplicaManagerBuilder().
             setConfig(brokerProperties).


Reply via email to