This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2f086d188fe KAFKA-18892: Add KIP-877 support for ClientQuotaCallback
(#19068)
2f086d188fe is described below
commit 2f086d188fe42db290d7037e83f552075e8bfcaa
Author: Ken Huang <[email protected]>
AuthorDate: Tue Apr 8 22:58:29 2025 +0800
KAFKA-18892: Add KIP-877 support for ClientQuotaCallback (#19068)
Allow ClientQuotaCallback to implement Monitorable and register metrics.
Reviewers: Mickael Maison <[email protected]>, TaiJuWu
<[email protected]>, Jhen-Yung Hsu <[email protected]>
---
.../server/quota/CustomQuotaCallbackTest.java | 91 ++++++++++++++++++----
.../org/apache/kafka/common/internals/Plugin.java | 6 ++
.../kafka/server/quota/ClientQuotaCallback.java | 7 ++
.../kafka/server/ClientRequestQuotaManager.java | 10 ++-
core/src/main/java/kafka/server/QuotaFactory.java | 55 +++++++++----
.../src/main/scala/kafka/server/BrokerServer.scala | 4 +-
.../scala/kafka/server/ClientQuotaManager.scala | 13 +++-
.../server/ControllerMutationQuotaManager.scala | 3 +-
.../main/scala/kafka/server/ControllerServer.scala | 3 +-
.../scala/kafka/server/DynamicBrokerConfig.scala | 26 ++++---
.../DynamicTopicClusterQuotaPublisher.scala | 4 +-
.../kafka/server/KRaftClusterTest.scala | 4 +-
.../kafka/server/LocalLeaderEndPointTest.scala | 2 +-
.../kafka/server/DynamicBrokerConfigTest.scala | 6 +-
.../server/HighwatermarkPersistenceTest.scala | 4 +-
.../unit/kafka/server/IsrExpirationTest.scala | 2 +-
.../server/ReplicaManagerConcurrencyTest.scala | 2 +-
.../kafka/server/ReplicaManagerQuotasTest.scala | 2 +-
.../unit/kafka/server/ReplicaManagerTest.scala | 2 +-
.../server/epoch/OffsetsForLeaderEpochTest.scala | 2 +-
.../apache/kafka/jmh/server/CheckpointBench.java | 2 +-
.../kafka/jmh/server/PartitionCreationBench.java | 2 +-
22 files changed, 183 insertions(+), 69 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/server/quota/CustomQuotaCallbackTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/server/quota/CustomQuotaCallbackTest.java
index e602ae081d7..da42c7a3007 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/server/quota/CustomQuotaCallbackTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/server/quota/CustomQuotaCallbackTest.java
@@ -19,39 +19,44 @@ package org.apache.kafka.server.quota;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Monitorable;
+import org.apache.kafka.common.metrics.PluginMetrics;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
-import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.server.config.QuotaConfig;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
-@ClusterTestDefaults(controllers = 3,
- types = {Type.KRAFT},
- serverProperties = {
- @ClusterConfigProperty(id = 3000, key =
QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value =
"org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"),
- @ClusterConfigProperty(id = 3001, key =
QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value =
"org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"),
- @ClusterConfigProperty(id = 3002, key =
QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value =
"org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"),
- }
-)
-public class CustomQuotaCallbackTest {
+import static org.junit.jupiter.api.Assertions.assertEquals;
- private final ClusterInstance cluster;
+public class CustomQuotaCallbackTest {
- public CustomQuotaCallbackTest(ClusterInstance clusterInstance) {
- this.cluster = clusterInstance;
+ private static int controllerId(Type type) {
+ return type == Type.KRAFT ? 3000 : 0;
}
- @ClusterTest
- public void testCustomQuotaCallbackWithControllerServer() throws
InterruptedException {
-
+ @ClusterTest(
+ controllers = 3,
+ types = {Type.KRAFT},
+ serverProperties = {
+ @ClusterConfigProperty(id = 3000, key =
QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value =
"org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"),
+ @ClusterConfigProperty(id = 3001, key =
QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value =
"org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"),
+ @ClusterConfigProperty(id = 3002, key =
QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value =
"org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"),
+ }
+ )
+ public void testCustomQuotaCallbackWithControllerServer(ClusterInstance
cluster) throws InterruptedException {
+
try (Admin admin = cluster.admin(Map.of())) {
admin.createTopics(List.of(new NewTopic("topic", 1, (short) 1)));
TestUtils.waitForCondition(
@@ -69,10 +74,49 @@ public class CustomQuotaCallbackTest {
&&
CustomQuotaCallback.COUNTERS.values().stream().allMatch(counter ->
counter.get() > 0),
"The CustomQuotaCallback not triggered in all controllers.
"
);
-
+
+ }
+ }
+
+ @ClusterTest(
+ types = {Type.CO_KRAFT, Type.KRAFT},
+ serverProperties = {
+ @ClusterConfigProperty(key =
QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value =
"org.apache.kafka.server.quota.CustomQuotaCallbackTest$MonitorableCustomQuotaCallback"),
}
+ )
+ public void
testMonitorableCustomQuotaCallbackWithCombinedMode(ClusterInstance cluster) {
+ assertMetrics(
+ cluster.brokers().get(0).metrics(),
+ expectedTags(Map.of("role", "broker"))
+ );
+ assertMetrics(
+ cluster.controllers().get(controllerId(cluster.type())).metrics(),
+ expectedTags(Map.of("role", "controller"))
+ );
}
+ private void assertMetrics(Metrics metrics, Map<String, String>
expectedTags) {
+ int found = 0;
+ for (MetricName metricName : metrics.metrics().keySet()) {
+ if (metricName.group().equals("plugins")) {
+ Map<String, String> tags = metricName.tags();
+ if (expectedTags.equals(tags)) {
+ assertEquals(MonitorableCustomQuotaCallback.METRIC_NAME,
metricName.name());
+
assertEquals(MonitorableCustomQuotaCallback.METRIC_DESCRIPTION,
metricName.description());
+ found++;
+ }
+ }
+ }
+ assertEquals(1, found);
+ }
+
+ private static Map<String, String> expectedTags(Map<String, String>
extraTags) {
+ Map<String, String> tags = new LinkedHashMap<>();
+ tags.put("config", QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG);
+ tags.put("class",
MonitorableCustomQuotaCallback.class.getSimpleName());
+ tags.putAll(extraTags);
+ return tags;
+ }
public static class CustomQuotaCallback implements ClientQuotaCallback {
@@ -121,4 +165,17 @@ public class CustomQuotaCallbackTest {
}
}
+
+ public static class MonitorableCustomQuotaCallback extends
CustomQuotaCallback implements Monitorable {
+
+ private static final String METRIC_NAME =
"monitorable-custom-quota-callback-name";
+ private static final String METRIC_DESCRIPTION =
"monitorable-custom-quota-callback-description";
+
+ @Override
+ public void withPluginMetrics(PluginMetrics metrics) {
+ MetricName metricName = metrics.metricName(METRIC_NAME,
METRIC_DESCRIPTION, Map.of());
+ metrics.addMetric(metricName, (Gauge<Integer>) (config, now) -> 1);
+ }
+
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/internals/Plugin.java
b/clients/src/main/java/org/apache/kafka/common/internals/Plugin.java
index 620cd0c07ec..587f9b74a8d 100644
--- a/clients/src/main/java/org/apache/kafka/common/internals/Plugin.java
+++ b/clients/src/main/java/org/apache/kafka/common/internals/Plugin.java
@@ -44,6 +44,12 @@ public class Plugin<T> implements Supplier<T>, AutoCloseable
{
return wrapInstance(instance, metrics, () -> tags(key, instance));
}
+ public static <T> Plugin<T> wrapInstance(T instance, Metrics metrics,
String key, Map<String, String> extraTags) {
+ Map<String, String> tags = tags(key, instance);
+ tags.putAll(extraTags);
+ return wrapInstance(instance, metrics, () -> tags);
+ }
+
private static <T> Map<String, String> tags(String key, T instance) {
Map<String, String> tags = new LinkedHashMap<>();
tags.put("config", key);
diff --git
a/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaCallback.java
b/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaCallback.java
index a9cb2bfb2af..01a8181d861 100644
---
a/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaCallback.java
+++
b/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaCallback.java
@@ -24,6 +24,13 @@ import java.util.Map;
/**
* Quota callback interface for brokers and controllers that enables
customization of client quota computation.
+ * Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the
callback to register metrics.
+ * The following tags are automatically added to all metrics registered:
+ * <ul>
+ * <li><code>config</code> set to
<code>client.quota.callback.class</code></li>
+ * <li><code>class</code> set to the ClientQuotaCallback class name</li>
+ * <li><code>role</code> set to broker/controller, which indicates the
role of the server</li>
+ * </ul>
*/
public interface ClientQuotaCallback extends Configurable {
diff --git a/core/src/main/java/kafka/server/ClientRequestQuotaManager.java
b/core/src/main/java/kafka/server/ClientRequestQuotaManager.java
index f93c50b2783..1d069587987 100644
--- a/core/src/main/java/kafka/server/ClientRequestQuotaManager.java
+++ b/core/src/main/java/kafka/server/ClientRequestQuotaManager.java
@@ -19,6 +19,7 @@ package kafka.server;
import kafka.network.RequestChannel;
import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.QuotaViolationException;
import org.apache.kafka.common.metrics.Sensor;
@@ -49,8 +50,13 @@ public class ClientRequestQuotaManager extends
ClientQuotaManager {
// Visible for testing
private final Sensor exemptSensor;
- public ClientRequestQuotaManager(ClientQuotaManagerConfig config, Metrics
metrics, Time time, String threadNamePrefix, Optional<ClientQuotaCallback>
quotaCallback) {
- super(config, metrics, QuotaType.REQUEST, time, threadNamePrefix,
OptionConverters.toScala(quotaCallback));
+ public ClientRequestQuotaManager(
+ ClientQuotaManagerConfig config,
+ Metrics metrics, Time time,
+ String threadNamePrefix,
+ Optional<Plugin<ClientQuotaCallback>> quotaCallbackPlugin
+ ) {
+ super(config, metrics, QuotaType.REQUEST, time, threadNamePrefix,
OptionConverters.toScala(quotaCallbackPlugin));
this.maxThrottleTimeMs =
TimeUnit.SECONDS.toMillis(config.quotaWindowSizeSeconds);
this.metrics = metrics;
this.exemptMetricName = metrics.metricName("exempt-request-time",
QuotaType.REQUEST.toString(), "Tracking exempt-request-time utilization
percentage");
diff --git a/core/src/main/java/kafka/server/QuotaFactory.java
b/core/src/main/java/kafka/server/QuotaFactory.java
index 57e9139e285..de9c8470789 100644
--- a/core/src/main/java/kafka/server/QuotaFactory.java
+++ b/core/src/main/java/kafka/server/QuotaFactory.java
@@ -17,17 +17,21 @@
package kafka.server;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.config.ClientQuotaManagerConfig;
import org.apache.kafka.server.config.QuotaConfig;
import org.apache.kafka.server.config.ReplicationQuotaManagerConfig;
import org.apache.kafka.server.quota.ClientQuotaCallback;
import org.apache.kafka.server.quota.QuotaType;
+import java.util.Map;
import java.util.Optional;
import scala.Option;
+import scala.jdk.javaapi.OptionConverters;
public class QuotaFactory {
@@ -56,12 +60,12 @@ public class QuotaFactory {
private final ReplicationQuotaManager leader;
private final ReplicationQuotaManager follower;
private final ReplicationQuotaManager alterLogDirs;
- private final Optional<ClientQuotaCallback> clientQuotaCallback;
+ private final Optional<Plugin<ClientQuotaCallback>>
clientQuotaCallbackPlugin;
public QuotaManagers(ClientQuotaManager fetch, ClientQuotaManager
produce, ClientRequestQuotaManager request,
ControllerMutationQuotaManager
controllerMutation, ReplicationQuotaManager leader,
ReplicationQuotaManager follower,
ReplicationQuotaManager alterLogDirs,
- Optional<ClientQuotaCallback>
clientQuotaCallback) {
+ Optional<Plugin<ClientQuotaCallback>>
clientQuotaCallbackPlugin) {
this.fetch = fetch;
this.produce = produce;
this.request = request;
@@ -69,7 +73,7 @@ public class QuotaFactory {
this.leader = leader;
this.follower = follower;
this.alterLogDirs = alterLogDirs;
- this.clientQuotaCallback = clientQuotaCallback;
+ this.clientQuotaCallbackPlugin = clientQuotaCallbackPlugin;
}
public ClientQuotaManager fetch() {
@@ -100,8 +104,8 @@ public class QuotaFactory {
return alterLogDirs;
}
- public Optional<ClientQuotaCallback> clientQuotaCallback() {
- return clientQuotaCallback;
+ public Optional<Plugin<ClientQuotaCallback>>
clientQuotaCallbackPlugin() {
+ return clientQuotaCallbackPlugin;
}
public void shutdown() {
@@ -109,26 +113,47 @@ public class QuotaFactory {
produce.shutdown();
request.shutdown();
controllerMutation.shutdown();
- clientQuotaCallback.ifPresent(ClientQuotaCallback::close);
+ clientQuotaCallbackPlugin.ifPresent(plugin ->
Utils.closeQuietly(plugin, "client quota callback plugin"));
}
}
- public static QuotaManagers instantiate(KafkaConfig cfg, Metrics metrics,
Time time, String threadNamePrefix) {
- ClientQuotaCallback clientQuotaCallback = cfg.getConfiguredInstance(
- QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG,
ClientQuotaCallback.class);
+ public static QuotaManagers instantiate(
+ KafkaConfig cfg,
+ Metrics metrics,
+ Time time,
+ String threadNamePrefix,
+ String role
+ ) {
+ Optional<Plugin<ClientQuotaCallback>> clientQuotaCallbackPlugin =
createClientQuotaCallback(cfg, metrics, role);
+ Option<Plugin<ClientQuotaCallback>> clientQuotaCallbackPluginOption =
OptionConverters.toScala(clientQuotaCallbackPlugin);
return new QuotaManagers(
- new ClientQuotaManager(clientConfig(cfg), metrics,
QuotaType.FETCH, time, threadNamePrefix, Option.apply(clientQuotaCallback)),
- new ClientQuotaManager(clientConfig(cfg), metrics,
QuotaType.PRODUCE, time, threadNamePrefix, Option.apply(clientQuotaCallback)),
- new ClientRequestQuotaManager(clientConfig(cfg), metrics, time,
threadNamePrefix, Optional.ofNullable(clientQuotaCallback)),
- new
ControllerMutationQuotaManager(clientControllerMutationConfig(cfg), metrics,
time, threadNamePrefix, Option.apply(clientQuotaCallback)),
+ new ClientQuotaManager(clientConfig(cfg), metrics,
QuotaType.FETCH, time, threadNamePrefix, clientQuotaCallbackPluginOption),
+ new ClientQuotaManager(clientConfig(cfg), metrics,
QuotaType.PRODUCE, time, threadNamePrefix, clientQuotaCallbackPluginOption),
+ new ClientRequestQuotaManager(clientConfig(cfg), metrics, time,
threadNamePrefix, clientQuotaCallbackPlugin),
+ new
ControllerMutationQuotaManager(clientControllerMutationConfig(cfg), metrics,
time, threadNamePrefix, clientQuotaCallbackPluginOption),
new ReplicationQuotaManager(replicationConfig(cfg), metrics,
QuotaType.LEADER_REPLICATION, time),
new ReplicationQuotaManager(replicationConfig(cfg), metrics,
QuotaType.FOLLOWER_REPLICATION, time),
new ReplicationQuotaManager(alterLogDirsReplicationConfig(cfg),
metrics, QuotaType.ALTER_LOG_DIRS_REPLICATION, time),
- Optional.ofNullable(clientQuotaCallback)
+ clientQuotaCallbackPlugin
);
}
+ private static Optional<Plugin<ClientQuotaCallback>>
createClientQuotaCallback(
+ KafkaConfig cfg,
+ Metrics metrics,
+ String role
+ ) {
+ ClientQuotaCallback clientQuotaCallback = cfg.getConfiguredInstance(
+ QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG,
ClientQuotaCallback.class);
+ return clientQuotaCallback == null ? Optional.empty() :
Optional.of(Plugin.wrapInstance(
+ clientQuotaCallback,
+ metrics,
+ QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG,
+ Map.of("role", role)
+ ));
+ }
+
private static ClientQuotaManagerConfig clientConfig(KafkaConfig cfg) {
return new ClientQuotaManagerConfig(
cfg.quotaConfig().numQuotaSamples(),
@@ -156,4 +181,4 @@ public class QuotaFactory {
cfg.quotaConfig().alterLogDirsReplicationQuotaWindowSizeSeconds()
);
}
-}
\ No newline at end of file
+}
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 0823ec683f7..31ebc14c960 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -54,7 +54,7 @@ import
org.apache.kafka.server.share.persister.{DefaultStatePersister, NoOpState
import org.apache.kafka.server.share.session.ShareSessionCache
import org.apache.kafka.server.util.timer.{SystemTimer, SystemTimerReaper}
import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler}
-import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures,
ClientMetricsManager, DelayedActionQueue}
+import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures,
ClientMetricsManager, DelayedActionQueue, ProcessRole}
import org.apache.kafka.storage.internals.log.LogDirFailureChannel
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
@@ -196,7 +196,7 @@ class BrokerServer(
val clientMetricsReceiverPlugin = new ClientMetricsReceiverPlugin()
config.dynamicConfig.initialize(Some(clientMetricsReceiverPlugin))
- quotaManagers = QuotaFactory.instantiate(config, metrics, time,
s"broker-${config.nodeId}-")
+ quotaManagers = QuotaFactory.instantiate(config, metrics, time,
s"broker-${config.nodeId}-", ProcessRole.BrokerRole.toString)
DynamicBrokerConfig.readDynamicBrokerConfigsFromSnapshot(raftManager,
config, quotaManagers)
/* start scheduler */
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index d8346e6ab85..894e410fc63 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -23,6 +23,7 @@ import java.util.function.Consumer
import kafka.network.RequestChannel
import kafka.server.ClientQuotaManager._
import kafka.utils.Logging
+import org.apache.kafka.common.internals.Plugin
import org.apache.kafka.common.{Cluster, MetricName}
import org.apache.kafka.common.metrics._
import org.apache.kafka.common.metrics.Metrics
@@ -137,22 +138,26 @@ object ClientQuotaManager {
* @param quotaType Quota type of this quota manager
* @param time @Time object to use
* @param threadNamePrefix The thread prefix to use
- * @param clientQuotaCallback An optional @ClientQuotaCallback
+ * @param clientQuotaCallbackPlugin An optional @ClientQuotaCallback and
+ * wrap it in a {@link
org.apache.kafka.common.internals.Plugin}
*/
class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
private val metrics: Metrics,
private val quotaType: QuotaType,
private val time: Time,
private val threadNamePrefix: String,
- private val clientQuotaCallback:
Option[ClientQuotaCallback] = None) extends Logging {
+ private val clientQuotaCallbackPlugin:
Option[Plugin[ClientQuotaCallback]] = None) extends Logging {
private val lock = new ReentrantReadWriteLock()
private val sensorAccessor = new SensorAccess(lock, metrics)
- private val quotaCallback = clientQuotaCallback.getOrElse(new
DefaultQuotaCallback)
+ private val quotaCallback = clientQuotaCallbackPlugin match {
+ case Some(plugin) => plugin.get()
+ case None => new DefaultQuotaCallback
+ }
private val clientQuotaType = QuotaType.toClientQuotaType(quotaType)
@volatile
- private var quotaTypesEnabled = clientQuotaCallback match {
+ private var quotaTypesEnabled = clientQuotaCallbackPlugin match {
case Some(_) => QuotaTypes.CustomQuotas
case None => QuotaTypes.NoQuotas
}
diff --git
a/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
b/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
index cc3a8892217..bfb9feedbf7 100644
--- a/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
@@ -19,6 +19,7 @@ package kafka.server
import kafka.network.RequestChannel
import org.apache.kafka.common.MetricName
import org.apache.kafka.common.errors.ThrottlingQuotaExceededException
+import org.apache.kafka.common.internals.Plugin
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.metrics.QuotaViolationException
import org.apache.kafka.common.metrics.Sensor
@@ -165,7 +166,7 @@ class ControllerMutationQuotaManager(private val config:
ClientQuotaManagerConfi
private val metrics: Metrics,
private val time: Time,
private val threadNamePrefix: String,
- private val quotaCallback:
Option[ClientQuotaCallback])
+ private val quotaCallback:
Option[Plugin[ClientQuotaCallback]])
extends ClientQuotaManager(config, metrics, QuotaType.CONTROLLER_MUTATION,
time, threadNamePrefix, quotaCallback) {
override protected def clientQuotaMetricName(quotaMetricTags: Map[String,
String]): MetricName = {
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala
b/core/src/main/scala/kafka/server/ControllerServer.scala
index f79e4a9e41d..e7537878ca6 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -40,6 +40,7 @@ import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
import org.apache.kafka.metadata.publisher.{AclPublisher, FeaturesPublisher}
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.CredentialProvider
+import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.authorizer.Authorizer
import
org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG,
CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG}
import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion,
NodeToControllerChannelManager}
@@ -266,7 +267,7 @@ class ControllerServer(
quotaManagers = QuotaFactory.instantiate(config,
metrics,
time,
- s"controller-${config.nodeId}-")
+ s"controller-${config.nodeId}-", ProcessRole.ControllerRole.toString)
clientQuotaMetadataManager = new
ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas)
controllerApis = new ControllerApis(socketServer.dataPlaneRequestChannel,
authorizer,
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 6c4066c9db2..6db9e0a6d0a 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -920,25 +920,31 @@ class DynamicClientQuotaCallback(
override def reconfigurableConfigs(): util.Set[String] = {
val configs = new util.HashSet[String]()
- quotaManagers.clientQuotaCallback.ifPresent {
- case callback: Reconfigurable =>
configs.addAll(callback.reconfigurableConfigs)
- case _ =>
+ quotaManagers.clientQuotaCallbackPlugin.ifPresent { plugin =>
+ plugin.get() match {
+ case callback: Reconfigurable =>
configs.addAll(callback.reconfigurableConfigs)
+ case _ =>
+ }
}
configs
}
override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
- quotaManagers.clientQuotaCallback.ifPresent {
- case callback: Reconfigurable =>
callback.validateReconfiguration(configs)
- case _ =>
+ quotaManagers.clientQuotaCallbackPlugin.ifPresent { plugin =>
+ plugin.get() match {
+ case callback: Reconfigurable =>
callback.validateReconfiguration(configs)
+ case _ =>
+ }
}
}
override def reconfigure(configs: util.Map[String, _]): Unit = {
- quotaManagers.clientQuotaCallback.ifPresent {
- case callback: Reconfigurable =>
- serverConfig.dynamicConfig.maybeReconfigure(callback,
serverConfig.dynamicConfig.currentKafkaConfig, configs)
- case _ =>
+ quotaManagers.clientQuotaCallbackPlugin.ifPresent { plugin =>
+ plugin.get() match {
+ case callback: Reconfigurable =>
+ serverConfig.dynamicConfig.maybeReconfigure(callback,
serverConfig.dynamicConfig.currentKafkaConfig, configs)
+ case _ =>
+ }
}
}
}
diff --git
a/core/src/main/scala/kafka/server/metadata/DynamicTopicClusterQuotaPublisher.scala
b/core/src/main/scala/kafka/server/metadata/DynamicTopicClusterQuotaPublisher.scala
index 64e21940447..7798c18b4d6 100644
---
a/core/src/main/scala/kafka/server/metadata/DynamicTopicClusterQuotaPublisher.scala
+++
b/core/src/main/scala/kafka/server/metadata/DynamicTopicClusterQuotaPublisher.scala
@@ -50,10 +50,10 @@ class DynamicTopicClusterQuotaPublisher (
newImage: MetadataImage,
): Unit = {
try {
- quotaManagers.clientQuotaCallback().ifPresent(clientQuotaCallback => {
+ quotaManagers.clientQuotaCallbackPlugin().ifPresent(plugin => {
if (delta.topicsDelta() != null || delta.clusterDelta() != null) {
val cluster = MetadataCache.toCluster(clusterId, newImage)
- if (clientQuotaCallback.updateClusterMetadata(cluster)) {
+ if (plugin.get().updateClusterMetadata(cluster)) {
quotaManagers.fetch.updateQuotaMetricConfigs()
quotaManagers.produce.updateQuotaMetricConfigs()
quotaManagers.request.updateQuotaMetricConfigs()
diff --git
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index d51a7eb325d..ea9f036e7f4 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -1188,9 +1188,9 @@ class KRaftClusterTest {
def assertConfigValue(expected: Int): Unit = {
TestUtils.retry(60000) {
assertEquals(expected,
cluster.controllers().values().iterator().next().
-
quotaManagers.clientQuotaCallback.get.asInstanceOf[DummyClientQuotaCallback].value)
+
quotaManagers.clientQuotaCallbackPlugin.get.get.asInstanceOf[DummyClientQuotaCallback].value)
assertEquals(expected, cluster.brokers().values().iterator().next().
-
quotaManagers.clientQuotaCallback.get.asInstanceOf[DummyClientQuotaCallback].value)
+
quotaManagers.clientQuotaCallbackPlugin.get.get.asInstanceOf[DummyClientQuotaCallback].value)
}
}
diff --git a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
index 64cf66db3ad..ae9cdc36b6b 100644
--- a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
+++ b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
@@ -62,7 +62,7 @@ class LocalLeaderEndPointTest extends Logging {
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new
File(_)))
val alterPartitionManager = mock(classOf[AlterPartitionManager])
val metrics = new Metrics
- quotaManager = QuotaFactory.instantiate(config, metrics, time, "")
+ quotaManager = QuotaFactory.instantiate(config, metrics, time, "", "")
replicaManager = new ReplicaManager(
metrics = metrics,
config = config,
diff --git
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 816752b2706..8030c5060dd 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -469,7 +469,7 @@ class DynamicBrokerConfigTest {
val metrics: Metrics = mock(classOf[Metrics])
when(kafkaServer.metrics).thenReturn(metrics)
val quotaManagers: QuotaFactory.QuotaManagers =
mock(classOf[QuotaFactory.QuotaManagers])
- when(quotaManagers.clientQuotaCallback).thenReturn(Optional.empty())
+ when(quotaManagers.clientQuotaCallbackPlugin).thenReturn(Optional.empty())
when(kafkaServer.quotaManagers).thenReturn(quotaManagers)
val socketServer: SocketServer = mock(classOf[SocketServer])
when(socketServer.reconfigurableConfigs).thenReturn(SocketServer.ReconfigurableConfigs)
@@ -515,7 +515,7 @@ class DynamicBrokerConfigTest {
val metrics: Metrics = mock(classOf[Metrics])
when(controllerServer.metrics).thenReturn(metrics)
val quotaManagers: QuotaFactory.QuotaManagers =
mock(classOf[QuotaFactory.QuotaManagers])
- when(quotaManagers.clientQuotaCallback).thenReturn(Optional.empty())
+ when(quotaManagers.clientQuotaCallbackPlugin).thenReturn(Optional.empty())
when(controllerServer.quotaManagers).thenReturn(quotaManagers)
val socketServer: SocketServer = mock(classOf[SocketServer])
when(socketServer.reconfigurableConfigs).thenReturn(SocketServer.ReconfigurableConfigs)
@@ -560,7 +560,7 @@ class DynamicBrokerConfigTest {
val metrics: Metrics = mock(classOf[Metrics])
when(controllerServer.metrics).thenReturn(metrics)
val quotaManagers: QuotaFactory.QuotaManagers =
mock(classOf[QuotaFactory.QuotaManagers])
- when(quotaManagers.clientQuotaCallback).thenReturn(Optional.empty())
+ when(quotaManagers.clientQuotaCallbackPlugin).thenReturn(Optional.empty())
when(controllerServer.quotaManagers).thenReturn(quotaManagers)
val socketServer: SocketServer = mock(classOf[SocketServer])
when(socketServer.reconfigurableConfigs).thenReturn(SocketServer.ReconfigurableConfigs)
diff --git
a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index 5d3177adea5..378c38e530f 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -64,7 +64,7 @@ class HighwatermarkPersistenceTest {
scheduler.startup()
val metrics = new Metrics
val time = new MockTime
- val quotaManager = QuotaFactory.instantiate(configs.head, metrics, time,
"")
+ val quotaManager = QuotaFactory.instantiate(configs.head, metrics, time,
"", "")
// create replica manager
val replicaManager = new ReplicaManager(
metrics = metrics,
@@ -122,7 +122,7 @@ class HighwatermarkPersistenceTest {
scheduler.startup()
val metrics = new Metrics
val time = new MockTime
- val quotaManager = QuotaFactory.instantiate(configs.head, metrics, time,
"")
+ val quotaManager = QuotaFactory.instantiate(configs.head, metrics, time,
"", "")
// create replica manager
val replicaManager = new ReplicaManager(
metrics = metrics,
diff --git a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
index 9d8836684ab..5ebf32accac 100644
--- a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
@@ -66,7 +66,7 @@ class IsrExpirationTest {
when(logManager.liveLogDirs).thenReturn(Array.empty[File])
alterIsrManager = TestUtils.createAlterIsrManager()
- quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "")
+ quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "",
"")
replicaManager = new ReplicaManager(
metrics = metrics,
config = configs.head,
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
index 6824f92d3dc..23ff3b71e97 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
@@ -183,7 +183,7 @@ class ReplicaManagerConcurrencyTest extends Logging {
time = time
)
- quotaManagers = QuotaFactory.instantiate(config, metrics, time, "")
+ quotaManagers = QuotaFactory.instantiate(config, metrics, time, "", "")
new ReplicaManager(
metrics = metrics,
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index f4fb346be63..a7948ae901f 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -298,7 +298,7 @@ class ReplicaManagerQuotasTest {
val alterIsrManager: AlterPartitionManager =
mock(classOf[AlterPartitionManager])
val leaderBrokerId = configs.head.brokerId
- quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "")
+ quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "",
"")
replicaManager = new ReplicaManager(
metrics = metrics,
config = configs.head,
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index a8e760c7b7a..61c9b681cd8 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -139,7 +139,7 @@ class ReplicaManagerTest {
val props = TestUtils.createBrokerConfig(1)
config = KafkaConfig.fromProps(props)
alterPartitionManager = mock(classOf[AlterPartitionManager])
- quotaManager = QuotaFactory.instantiate(config, metrics, time, "")
+ quotaManager = QuotaFactory.instantiate(config, metrics, time, "", "")
mockRemoteLogManager = mock(classOf[RemoteLogManager])
when(mockRemoteLogManager.fetchThrottleTimeSensor()).thenReturn(
new RLMQuotaMetrics(metrics,
diff --git
a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
index 05ab66ec5fb..1e297fc33c9 100644
---
a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
+++
b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
@@ -50,7 +50,7 @@ class OffsetsForLeaderEpochTest {
@BeforeEach
def setUp(): Unit = {
- quotaManager = QuotaFactory.instantiate(config, metrics, time, "")
+ quotaManager = QuotaFactory.instantiate(config, metrics, time, "", "")
}
@Test
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
index 05532b7ea14..2db7f2bb1c2 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
@@ -120,7 +120,7 @@ public class CheckpointBench {
this.quotaManagers =
QuotaFactory.instantiate(this.brokerProperties,
this.metrics,
- this.time, "");
+ this.time, "", "");
this.alterPartitionManager = TestUtils.createAlterIsrManager();
this.replicaManager = new ReplicaManagerBuilder().
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
index 1a82b6c110b..4d544926afa 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
@@ -141,7 +141,7 @@ public class PartitionCreationBench {
setTime(Time.SYSTEM).
build();
scheduler.startup();
- this.quotaManagers = QuotaFactory.instantiate(this.brokerProperties,
this.metrics, this.time, "");
+ this.quotaManagers = QuotaFactory.instantiate(this.brokerProperties,
this.metrics, this.time, "", "");
this.alterPartitionManager = TestUtils.createAlterIsrManager();
this.replicaManager = new ReplicaManagerBuilder().
setConfig(brokerProperties).