This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7fb6e9ec1c0 KAFKA-17840 Move ReplicationQuotaManager,
ClientRequestQuotaManager and QuotaFactory to server module (#17609)
7fb6e9ec1c0 is described below
commit 7fb6e9ec1c059289740f72c0e27babad24d9beec
Author: PoAn Yang <[email protected]>
AuthorDate: Wed Oct 30 21:18:28 2024 +0800
KAFKA-17840 Move ReplicationQuotaManager, ClientRequestQuotaManager and
QuotaFactory to server module (#17609)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/server/ClientRequestQuotaManager.java | 107 +++++++++++++
core/src/main/java/kafka/server/QuotaFactory.java | 159 ++++++++++++++++++
core/src/main/java/kafka/server/ReplicaQuota.java | 25 +++
.../java/kafka/server/ReplicationQuotaManager.java | 154 ++++++++++++++++++
.../java/kafka/server/share/DelayedShareFetch.java | 2 +-
.../main/scala/kafka/network/RequestChannel.scala | 8 +-
.../kafka/server/ClientRequestQuotaManager.scala | 94 -----------
.../main/scala/kafka/server/ConfigHandler.scala | 5 +-
.../scala/kafka/server/DynamicBrokerConfig.scala | 9 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 6 +-
.../scala/kafka/server/LocalLeaderEndPoint.scala | 4 +-
.../src/main/scala/kafka/server/QuotaFactory.scala | 97 -----------
.../kafka/server/ReplicationQuotaManager.scala | 177 ---------------------
.../kafka/server/LocalLeaderEndPointTest.scala | 2 +-
.../kafka/server/RemoteLeaderEndPointTest.scala | 2 +-
.../server/ClientRequestQuotaManagerTest.scala | 6 +-
.../unit/kafka/server/ControllerApisTest.scala | 10 +-
.../kafka/server/DynamicBrokerConfigTest.scala | 8 +-
.../kafka/server/DynamicConfigChangeTest.scala | 5 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 4 +-
.../server/ReplicaAlterLogDirsThreadTest.scala | 4 +-
.../kafka/server/ReplicaFetcherThreadTest.scala | 4 +-
.../server/ReplicaManagerConcurrencyTest.scala | 2 +-
.../unit/kafka/server/ReplicaManagerTest.scala | 14 +-
.../kafka/server/ReplicationQuotaManagerTest.scala | 2 +-
.../metadata/KRaftMetadataRequestBenchmark.java | 2 +-
26 files changed, 496 insertions(+), 416 deletions(-)
diff --git a/core/src/main/java/kafka/server/ClientRequestQuotaManager.java
b/core/src/main/java/kafka/server/ClientRequestQuotaManager.java
new file mode 100644
index 00000000000..f93c50b2783
--- /dev/null
+++ b/core/src/main/java/kafka/server/ClientRequestQuotaManager.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server;
+
+import kafka.network.RequestChannel;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.config.ClientQuotaManagerConfig;
+import org.apache.kafka.server.quota.ClientQuotaCallback;
+import org.apache.kafka.server.quota.QuotaType;
+import org.apache.kafka.server.quota.QuotaUtils;
+
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import scala.jdk.javaapi.CollectionConverters;
+import scala.jdk.javaapi.OptionConverters;
+
+@SuppressWarnings("this-escape")
+public class ClientRequestQuotaManager extends ClientQuotaManager {
+ // Since exemptSensor is for all clients and has a constant name, we do
not expire exemptSensor and only
+ // create once.
+ static final double NANOS_TO_PERCENTAGE_PER_SECOND = 100.0 /
TimeUnit.SECONDS.toNanos(1);
+ private static final long
DEFAULT_INACTIVE_EXEMPT_SENSOR_EXPIRATION_TIME_SECONDS = Long.MAX_VALUE;
+ private static final String EXEMPT_SENSOR_NAME = "exempt-" +
QuotaType.REQUEST;
+
+ private final long maxThrottleTimeMs;
+ private final Metrics metrics;
+ private final MetricName exemptMetricName;
+ // Visible for testing
+ private final Sensor exemptSensor;
+
+ public ClientRequestQuotaManager(ClientQuotaManagerConfig config, Metrics
metrics, Time time, String threadNamePrefix, Optional<ClientQuotaCallback>
quotaCallback) {
+ super(config, metrics, QuotaType.REQUEST, time, threadNamePrefix,
OptionConverters.toScala(quotaCallback));
+ this.maxThrottleTimeMs =
TimeUnit.SECONDS.toMillis(config.quotaWindowSizeSeconds);
+ this.metrics = metrics;
+ this.exemptMetricName = metrics.metricName("exempt-request-time",
QuotaType.REQUEST.toString(), "Tracking exempt-request-time utilization
percentage");
+ exemptSensor = getOrCreateSensor(EXEMPT_SENSOR_NAME,
DEFAULT_INACTIVE_EXEMPT_SENSOR_EXPIRATION_TIME_SECONDS, sensor ->
sensor.add(exemptMetricName, new Rate()));
+ }
+
+ public Sensor exemptSensor() {
+ return exemptSensor;
+ }
+
+ private void recordExempt(double value) {
+ exemptSensor.record(value);
+ }
+
+ /**
+ * Records that a user/clientId changed request processing time being
throttled. If quota has been violated, return
+ * throttle time in milliseconds. Throttle time calculation may be
overridden by sub-classes.
+ * @param request client request
+ * @return Number of milliseconds to throttle in case of quota violation.
Zero otherwise
+ */
+ public int maybeRecordAndGetThrottleTimeMs(RequestChannel.Request request,
long timeMs) {
+ if (quotasEnabled()) {
+ request.setRecordNetworkThreadTimeCallback(timeNanos -> {
+ recordNoThrottle(request.session(),
request.header().clientId(),
nanosToPercentage(Long.parseLong(timeNanos.toString())));
+ });
+ return recordAndGetThrottleTimeMs(request.session(),
request.header().clientId(),
nanosToPercentage(request.requestThreadTimeNanos()), timeMs);
+ } else {
+ return 0;
+ }
+ }
+
+ public void maybeRecordExempt(RequestChannel.Request request) {
+ if (quotasEnabled()) {
+ request.setRecordNetworkThreadTimeCallback(timeNanos -> {
+
recordExempt(nanosToPercentage(Long.parseLong(timeNanos.toString())));
+ });
+ recordExempt(nanosToPercentage(request.requestThreadTimeNanos()));
+ }
+ }
+
+ @Override
+ public long throttleTime(QuotaViolationException e, long timeMs) {
+ return QuotaUtils.boundedThrottleTime(e, maxThrottleTimeMs, timeMs);
+ }
+
+ @Override
+ public MetricName
clientQuotaMetricName(scala.collection.immutable.Map<String, String>
quotaMetricTags) {
+ return metrics.metricName("request-time",
QuotaType.REQUEST.toString(), "Tracking request-time per user/client-id",
CollectionConverters.asJava(quotaMetricTags));
+ }
+
+ private double nanosToPercentage(long nanos) {
+ return nanos * NANOS_TO_PERCENTAGE_PER_SECOND;
+ }
+}
diff --git a/core/src/main/java/kafka/server/QuotaFactory.java
b/core/src/main/java/kafka/server/QuotaFactory.java
new file mode 100644
index 00000000000..57e9139e285
--- /dev/null
+++ b/core/src/main/java/kafka/server/QuotaFactory.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.config.ClientQuotaManagerConfig;
+import org.apache.kafka.server.config.QuotaConfig;
+import org.apache.kafka.server.config.ReplicationQuotaManagerConfig;
+import org.apache.kafka.server.quota.ClientQuotaCallback;
+import org.apache.kafka.server.quota.QuotaType;
+
+import java.util.Optional;
+
+import scala.Option;
+
+public class QuotaFactory {
+
+ public static final ReplicaQuota UNBOUNDED_QUOTA = new ReplicaQuota() {
+ @Override
+ public boolean isThrottled(TopicPartition topicPartition) {
+ return false;
+ }
+
+ @Override
+ public boolean isQuotaExceeded() {
+ return false;
+ }
+
+ @Override
+ public void record(long value) {
+ // No-op
+ }
+ };
+
+ public static class QuotaManagers {
+ private final ClientQuotaManager fetch;
+ private final ClientQuotaManager produce;
+ private final ClientRequestQuotaManager request;
+ private final ControllerMutationQuotaManager controllerMutation;
+ private final ReplicationQuotaManager leader;
+ private final ReplicationQuotaManager follower;
+ private final ReplicationQuotaManager alterLogDirs;
+ private final Optional<ClientQuotaCallback> clientQuotaCallback;
+
+ public QuotaManagers(ClientQuotaManager fetch, ClientQuotaManager
produce, ClientRequestQuotaManager request,
+ ControllerMutationQuotaManager
controllerMutation, ReplicationQuotaManager leader,
+ ReplicationQuotaManager follower,
ReplicationQuotaManager alterLogDirs,
+ Optional<ClientQuotaCallback>
clientQuotaCallback) {
+ this.fetch = fetch;
+ this.produce = produce;
+ this.request = request;
+ this.controllerMutation = controllerMutation;
+ this.leader = leader;
+ this.follower = follower;
+ this.alterLogDirs = alterLogDirs;
+ this.clientQuotaCallback = clientQuotaCallback;
+ }
+
+ public ClientQuotaManager fetch() {
+ return fetch;
+ }
+
+ public ClientQuotaManager produce() {
+ return produce;
+ }
+
+ public ClientRequestQuotaManager request() {
+ return request;
+ }
+
+ public ControllerMutationQuotaManager controllerMutation() {
+ return controllerMutation;
+ }
+
+ public ReplicationQuotaManager leader() {
+ return leader;
+ }
+
+ public ReplicationQuotaManager follower() {
+ return follower;
+ }
+
+ public ReplicationQuotaManager alterLogDirs() {
+ return alterLogDirs;
+ }
+
+ public Optional<ClientQuotaCallback> clientQuotaCallback() {
+ return clientQuotaCallback;
+ }
+
+ public void shutdown() {
+ fetch.shutdown();
+ produce.shutdown();
+ request.shutdown();
+ controllerMutation.shutdown();
+ clientQuotaCallback.ifPresent(ClientQuotaCallback::close);
+ }
+ }
+
+ public static QuotaManagers instantiate(KafkaConfig cfg, Metrics metrics,
Time time, String threadNamePrefix) {
+ ClientQuotaCallback clientQuotaCallback = cfg.getConfiguredInstance(
+ QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG,
ClientQuotaCallback.class);
+
+ return new QuotaManagers(
+ new ClientQuotaManager(clientConfig(cfg), metrics,
QuotaType.FETCH, time, threadNamePrefix, Option.apply(clientQuotaCallback)),
+ new ClientQuotaManager(clientConfig(cfg), metrics,
QuotaType.PRODUCE, time, threadNamePrefix, Option.apply(clientQuotaCallback)),
+ new ClientRequestQuotaManager(clientConfig(cfg), metrics, time,
threadNamePrefix, Optional.ofNullable(clientQuotaCallback)),
+ new
ControllerMutationQuotaManager(clientControllerMutationConfig(cfg), metrics,
time, threadNamePrefix, Option.apply(clientQuotaCallback)),
+ new ReplicationQuotaManager(replicationConfig(cfg), metrics,
QuotaType.LEADER_REPLICATION, time),
+ new ReplicationQuotaManager(replicationConfig(cfg), metrics,
QuotaType.FOLLOWER_REPLICATION, time),
+ new ReplicationQuotaManager(alterLogDirsReplicationConfig(cfg),
metrics, QuotaType.ALTER_LOG_DIRS_REPLICATION, time),
+ Optional.ofNullable(clientQuotaCallback)
+ );
+ }
+
+ private static ClientQuotaManagerConfig clientConfig(KafkaConfig cfg) {
+ return new ClientQuotaManagerConfig(
+ cfg.quotaConfig().numQuotaSamples(),
+ cfg.quotaConfig().quotaWindowSizeSeconds()
+ );
+ }
+
+ private static ClientQuotaManagerConfig
clientControllerMutationConfig(KafkaConfig cfg) {
+ return new ClientQuotaManagerConfig(
+ cfg.quotaConfig().numControllerQuotaSamples(),
+ cfg.quotaConfig().controllerQuotaWindowSizeSeconds()
+ );
+ }
+
+ private static ReplicationQuotaManagerConfig replicationConfig(KafkaConfig
cfg) {
+ return new ReplicationQuotaManagerConfig(
+ cfg.quotaConfig().numReplicationQuotaSamples(),
+ cfg.quotaConfig().replicationQuotaWindowSizeSeconds()
+ );
+ }
+
+ private static ReplicationQuotaManagerConfig
alterLogDirsReplicationConfig(KafkaConfig cfg) {
+ return new ReplicationQuotaManagerConfig(
+ cfg.quotaConfig().numAlterLogDirsReplicationQuotaSamples(),
+ cfg.quotaConfig().alterLogDirsReplicationQuotaWindowSizeSeconds()
+ );
+ }
+}
\ No newline at end of file
diff --git a/core/src/main/java/kafka/server/ReplicaQuota.java
b/core/src/main/java/kafka/server/ReplicaQuota.java
new file mode 100644
index 00000000000..7d65ba25001
--- /dev/null
+++ b/core/src/main/java/kafka/server/ReplicaQuota.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server;
+
+import org.apache.kafka.common.TopicPartition;
+
+public interface ReplicaQuota {
+ void record(long value);
+ boolean isThrottled(TopicPartition topicPartition);
+ boolean isQuotaExceeded();
+}
\ No newline at end of file
diff --git a/core/src/main/java/kafka/server/ReplicationQuotaManager.java
b/core/src/main/java/kafka/server/ReplicationQuotaManager.java
new file mode 100644
index 00000000000..9301e59dfe6
--- /dev/null
+++ b/core/src/main/java/kafka/server/ReplicationQuotaManager.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Quota;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.config.ReplicationQuotaManagerConfig;
+import org.apache.kafka.server.quota.QuotaType;
+import org.apache.kafka.server.quota.SensorAccess;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class ReplicationQuotaManager implements ReplicaQuota {
+ public static final List<Integer> ALL_REPLICAS = List.of(-1);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ReplicationQuotaManager.class);
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private final ConcurrentHashMap<String, List<Integer>> throttledPartitions
= new ConcurrentHashMap<>();
+ private final ReplicationQuotaManagerConfig config;
+ private final Metrics metrics;
+ private final QuotaType replicationType;
+ private final Time time;
+ private final SensorAccess sensorAccess;
+ private final MetricName rateMetricName;
+ private Quota quota;
+
+ public ReplicationQuotaManager(ReplicationQuotaManagerConfig config,
Metrics metrics, QuotaType replicationType, Time time) {
+ this.config = config;
+ this.metrics = metrics;
+ this.replicationType = replicationType;
+ this.time = time;
+ this.sensorAccess = new SensorAccess(lock, metrics);
+ this.rateMetricName = metrics.metricName("byte-rate",
replicationType.toString(), "Tracking byte-rate for " + replicationType);
+ }
+
+ /**
+ * Update the quota
+ */
+ public void updateQuota(Quota quota) {
+ lock.writeLock().lock();
+ try {
+ this.quota = quota;
+ KafkaMetric metric = metrics.metrics().get(rateMetricName);
+ if (metric != null) {
+ metric.config(getQuotaMetricConfig(quota));
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Check if the quota is currently exceeded
+ */
+ @Override
+ public boolean isQuotaExceeded() {
+ try {
+ sensor().checkQuotas();
+ return false;
+ } catch (QuotaViolationException qve) {
+ LOGGER.trace("{}: Quota violated for sensor ({}), metric: ({}),
metric-value: ({}), bound: ({})",
+ replicationType, sensor().name(), qve.metric().metricName(),
qve.value(), qve.bound());
+ return true;
+ }
+ }
+
+ /**
+ * Is the passed partition throttled by this ReplicationQuotaManager
+ */
+ @Override
+ public boolean isThrottled(TopicPartition topicPartition) {
+ List<Integer> partitions =
throttledPartitions.get(topicPartition.topic());
+ return partitions != null && (partitions.equals(ALL_REPLICAS) ||
partitions.contains(topicPartition.partition()));
+ }
+
+ /**
+ * Add the passed value to the throttled rate. This method ignores the
quota with
+ * the value being added to the rate even if the quota is exceeded
+ */
+ @Override
+ public void record(long value) {
+ sensor().record((double) value, time.milliseconds(), false);
+ }
+
+ /**
+ * Update the set of throttled partitions for this QuotaManager. The
partitions passed, for
+ * any single topic, will replace any previous
+ */
+ public void markThrottled(String topic, List<Integer> partitions) {
+ throttledPartitions.put(topic, partitions);
+ }
+
+ /**
+ * Mark all replicas for this topic as throttled
+ */
+ public void markThrottled(String topic) {
+ markThrottled(topic, ALL_REPLICAS);
+ }
+
+ public void removeThrottle(String topic) {
+ throttledPartitions.remove(topic);
+ }
+
+ public long upperBound() {
+ lock.readLock().lock();
+ try {
+ return quota != null ? (long) quota.bound() : Long.MAX_VALUE;
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ private MetricConfig getQuotaMetricConfig(Quota quota) {
+ return new MetricConfig()
+ .timeWindow(config.quotaWindowSizeSeconds, TimeUnit.SECONDS)
+ .samples(config.numQuotaSamples)
+ .quota(quota);
+ }
+
+ private Sensor sensor() {
+ return sensorAccess.getOrCreate(
+ replicationType.toString(),
+
ReplicationQuotaManagerConfig.INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS,
+ sensor -> sensor.add(rateMetricName, new SimpleRate(),
getQuotaMetricConfig(quota))
+ );
+ }
+}
diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
index 732381a1976..f04b5186b8b 100644
--- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
+++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
@@ -109,7 +109,7 @@ public class DelayedShareFetch extends DelayedOperation {
topicPartitionData.entrySet().stream().map(entry ->
new Tuple2<>(entry.getKey(),
entry.getValue())).collect(Collectors.toList())
),
- QuotaFactory.UnboundedQuota$.MODULE$,
+ QuotaFactory.UNBOUNDED_QUOTA,
true);
Map<TopicIdPartition, FetchPartitionData> responseData = new
HashMap<>();
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala
b/core/src/main/scala/kafka/network/RequestChannel.scala
index c48811439e2..44f5e926eb9 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -73,7 +73,7 @@ object RequestChannel extends Logging {
@volatile var messageConversionsTimeNanos: Long = 0L
@volatile var apiThrottleTimeMs: Long = 0L
@volatile var temporaryMemoryBytes: Long = 0L
- @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
+ @volatile var recordNetworkThreadTimeCallback:
Option[java.util.function.Consumer[java.lang.Long]] = None
@volatile var callbackRequestDequeueTimeNanos: Option[Long] = None
@volatile var callbackRequestCompleteTimeNanos: Option[Long] = None
@@ -247,7 +247,7 @@ object RequestChannel extends Logging {
// The time recorded here is the time spent on the network thread for
receiving this request
// and sending the response. Note that for the first request on a
connection, the time includes
// the total time spent on authentication, which may be significant for
SASL/SSL.
- recordNetworkThreadTimeCallback.foreach(record =>
record(networkThreadTimeNanos))
+ recordNetworkThreadTimeCallback.foreach(record =>
record.accept(networkThreadTimeNanos))
if (isRequestLoggingEnabled) {
val desc = RequestConvertToJson.requestDescMetrics(header,
requestLog.toJava, response.responseLog.toJava,
@@ -272,6 +272,10 @@ object RequestChannel extends Logging {
}
}
+ def setRecordNetworkThreadTimeCallback(callback:
java.util.function.Consumer[java.lang.Long]): Unit = {
+ recordNetworkThreadTimeCallback = Some(callback)
+ }
+
override def toString: String = s"Request(processor=$processor, " +
s"connectionId=${context.connectionId}, " +
s"session=$session, " +
diff --git a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
deleted file mode 100644
index 248a7cb5125..00000000000
--- a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server
-
-import java.util.concurrent.TimeUnit
-import kafka.network.RequestChannel
-import org.apache.kafka.common.MetricName
-import org.apache.kafka.common.metrics._
-import org.apache.kafka.common.metrics.stats.Rate
-import org.apache.kafka.common.utils.Time
-import org.apache.kafka.server.config.ClientQuotaManagerConfig
-import org.apache.kafka.server.quota.{ClientQuotaCallback, QuotaType,
QuotaUtils}
-
-import scala.jdk.CollectionConverters._
-
-object ClientRequestQuotaManager {
- val NanosToPercentagePerSecond: Double = 100.0 / TimeUnit.SECONDS.toNanos(1)
- // Since exemptSensor is for all clients and has a constant name, we do not
expire exemptSensor and only
- // create once.
- private val DefaultInactiveExemptSensorExpirationTimeSeconds: Long =
Long.MaxValue
-
- private val ExemptSensorName = "exempt-" + QuotaType.REQUEST
-}
-
-class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig,
- private val metrics: Metrics,
- private val time: Time,
- private val threadNamePrefix: String,
- private val quotaCallback:
Option[ClientQuotaCallback])
- extends ClientQuotaManager(config, metrics, QuotaType.REQUEST, time,
threadNamePrefix, quotaCallback) {
-
- private val maxThrottleTimeMs =
TimeUnit.SECONDS.toMillis(this.config.quotaWindowSizeSeconds)
- private val exemptMetricName = metrics.metricName("exempt-request-time",
- QuotaType.REQUEST.toString, "Tracking exempt-request-time utilization
percentage")
-
- val exemptSensor: Sensor =
getOrCreateSensor(ClientRequestQuotaManager.ExemptSensorName,
- ClientRequestQuotaManager.DefaultInactiveExemptSensorExpirationTimeSeconds,
- sensor => sensor.add(exemptMetricName, new Rate))
-
- private def recordExempt(value: Double): Unit = {
- exemptSensor.record(value)
- }
-
- /**
- * Records that a user/clientId changed request processing time being
throttled. If quota has been violated, return
- * throttle time in milliseconds. Throttle time calculation may be
overridden by sub-classes.
- * @param request client request
- * @return Number of milliseconds to throttle in case of quota violation.
Zero otherwise
- */
- def maybeRecordAndGetThrottleTimeMs(request: RequestChannel.Request, timeMs:
Long): Int = {
- if (quotasEnabled) {
- request.recordNetworkThreadTimeCallback = Some(timeNanos =>
recordNoThrottle(
- request.session, request.header.clientId,
nanosToPercentage(timeNanos)))
- recordAndGetThrottleTimeMs(request.session, request.header.clientId,
- nanosToPercentage(request.requestThreadTimeNanos), timeMs)
- } else {
- 0
- }
- }
-
- def maybeRecordExempt(request: RequestChannel.Request): Unit = {
- if (quotasEnabled) {
- request.recordNetworkThreadTimeCallback = Some(timeNanos =>
recordExempt(nanosToPercentage(timeNanos)))
- recordExempt(nanosToPercentage(request.requestThreadTimeNanos))
- }
- }
-
- override protected def throttleTime(e: QuotaViolationException, timeMs:
Long): Long = {
- QuotaUtils.boundedThrottleTime(e, maxThrottleTimeMs, timeMs)
- }
-
- override protected def clientQuotaMetricName(quotaMetricTags: Map[String,
String]): MetricName = {
- metrics.metricName("request-time", QuotaType.REQUEST.toString,
- "Tracking request-time per user/client-id",
- quotaMetricTags.asJava)
- }
-
- private def nanosToPercentage(nanos: Long): Double =
- nanos * ClientRequestQuotaManager.NanosToPercentagePerSecond
-}
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala
b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 3f665c4eb91..35c521bfa63 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -22,7 +22,6 @@ import java.util.{Collections, Properties}
import kafka.controller.KafkaController
import kafka.log.UnifiedLog
import kafka.network.ConnectionQuotas
-import kafka.server.Constants._
import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.Logging
import org.apache.kafka.server.config.{QuotaConfig, ReplicationConfigs,
ZooKeeperInternals}
@@ -132,7 +131,7 @@ class TopicConfigHandler(private val replicaManager:
ReplicaManager,
def updateThrottledList(prop: String, quotaManager:
ReplicationQuotaManager): Unit = {
if (topicConfig.containsKey(prop) &&
topicConfig.getProperty(prop).nonEmpty) {
val partitions = parseThrottledPartitions(topicConfig,
kafkaConfig.brokerId, prop)
- quotaManager.markThrottled(topic, partitions)
+ quotaManager.markThrottled(topic,
partitions.map(Integer.valueOf).asJava)
debug(s"Setting $prop on broker ${kafkaConfig.brokerId} for topic:
$topic and partitions $partitions")
} else {
quotaManager.removeThrottle(topic)
@@ -152,7 +151,7 @@ class TopicConfigHandler(private val replicaManager:
ReplicaManager,
ThrottledReplicaListValidator.ensureValidString(prop, configValue)
configValue match {
case "" => Seq()
- case "*" => AllReplicas
+ case "*" =>
ReplicationQuotaManager.ALL_REPLICAS.asScala.map(_.toInt).toSeq
case _ => configValue.trim
.split(",")
.map(_.split(":"))
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 79585349268..9f69a44c919 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -1016,7 +1016,7 @@ class DynamicClientQuotaCallback(
override def reconfigurableConfigs(): util.Set[String] = {
val configs = new util.HashSet[String]()
- quotaManagers.clientQuotaCallback.foreach {
+ quotaManagers.clientQuotaCallback.ifPresent {
case callback: Reconfigurable =>
configs.addAll(callback.reconfigurableConfigs)
case _ =>
}
@@ -1024,18 +1024,17 @@ class DynamicClientQuotaCallback(
}
override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
- quotaManagers.clientQuotaCallback.foreach {
+ quotaManagers.clientQuotaCallback.ifPresent {
case callback: Reconfigurable =>
callback.validateReconfiguration(configs)
case _ =>
}
}
override def reconfigure(configs: util.Map[String, _]): Unit = {
- quotaManagers.clientQuotaCallback.foreach {
+ quotaManagers.clientQuotaCallback.ifPresent {
case callback: Reconfigurable =>
serverConfig.dynamicConfig.maybeReconfigure(callback,
serverConfig.dynamicConfig.currentKafkaConfig, configs)
- true
- case _ => false
+ case _ =>
}
}
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index c2f24381258..b4bb9a69198 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -20,7 +20,7 @@ package kafka.server
import kafka.controller.ReplicaAssignment
import kafka.coordinator.transaction.{InitProducerIdResult,
TransactionCoordinator}
import kafka.network.RequestChannel
-import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
+import kafka.server.QuotaFactory.{QuotaManagers, UNBOUNDED_QUOTA}
import kafka.server.handlers.DescribeTopicPartitionsRequestHandler
import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache}
import kafka.server.share.SharePartitionManager
@@ -406,7 +406,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- quotas.clientQuotaCallback.foreach { callback =>
+ quotas.clientQuotaCallback.ifPresent { callback =>
if
(callback.updateClusterMetadata(metadataCache.getClusterMetadata(clusterId,
request.context.listenerName))) {
quotas.fetch.updateQuotaMetricConfigs()
quotas.produce.updateQuotaMetricConfigs()
@@ -1073,7 +1073,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def replicationQuota(fetchRequest: FetchRequest): ReplicaQuota =
- if (fetchRequest.isFromFollower) quotas.leader else UnboundedQuota
+ if (fetchRequest.isFromFollower) quotas.leader else UNBOUNDED_QUOTA
def handleListOffsetRequest(request: RequestChannel.Request): Unit = {
val version = request.header.apiVersion
diff --git a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
index 73b661f0bb4..03258295a41 100644
--- a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
+++ b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
@@ -18,7 +18,7 @@
package kafka.server
import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions}
-import kafka.server.QuotaFactory.UnboundedQuota
+import kafka.server.QuotaFactory.UNBOUNDED_QUOTA
import kafka.utils.Logging
import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.message.FetchResponseData
@@ -105,7 +105,7 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint,
replicaManager.fetchMessages(
params = fetchParams,
fetchInfos = fetchData.asScala.toSeq,
- quota = UnboundedQuota,
+ quota = UNBOUNDED_QUOTA,
responseCallback = processResponseCallback
)
diff --git a/core/src/main/scala/kafka/server/QuotaFactory.scala
b/core/src/main/scala/kafka/server/QuotaFactory.scala
deleted file mode 100644
index 6d295a96369..00000000000
--- a/core/src/main/scala/kafka/server/QuotaFactory.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server
-
-import kafka.utils.Logging
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.server.quota.{ClientQuotaCallback, QuotaType}
-import org.apache.kafka.common.utils.Time
-import org.apache.kafka.server.config.{ClientQuotaManagerConfig, QuotaConfig,
ReplicationQuotaManagerConfig}
-
-
-object QuotaFactory extends Logging {
-
- object UnboundedQuota extends ReplicaQuota {
- override def isThrottled(topicPartition: TopicPartition): Boolean = false
- override def isQuotaExceeded: Boolean = false
- def record(value: Long): Unit = ()
- }
-
- case class QuotaManagers(fetch: ClientQuotaManager,
- produce: ClientQuotaManager,
- request: ClientRequestQuotaManager,
- controllerMutation: ControllerMutationQuotaManager,
- leader: ReplicationQuotaManager,
- follower: ReplicationQuotaManager,
- alterLogDirs: ReplicationQuotaManager,
- clientQuotaCallback: Option[ClientQuotaCallback]) {
- def shutdown(): Unit = {
- fetch.shutdown()
- produce.shutdown()
- request.shutdown()
- controllerMutation.shutdown()
- clientQuotaCallback.foreach(_.close())
- }
- }
-
- def instantiate(cfg: KafkaConfig, metrics: Metrics, time: Time,
threadNamePrefix: String): QuotaManagers = {
-
- val clientQuotaCallback =
Option(cfg.getConfiguredInstance(QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG,
- classOf[ClientQuotaCallback]))
- QuotaManagers(
- new ClientQuotaManager(clientConfig(cfg), metrics, QuotaType.FETCH,
time, threadNamePrefix, clientQuotaCallback),
- new ClientQuotaManager(clientConfig(cfg), metrics, QuotaType.PRODUCE,
time, threadNamePrefix, clientQuotaCallback),
- new ClientRequestQuotaManager(clientConfig(cfg), metrics, time,
threadNamePrefix, clientQuotaCallback),
- new ControllerMutationQuotaManager(clientControllerMutationConfig(cfg),
metrics, time,
- threadNamePrefix, clientQuotaCallback),
- new ReplicationQuotaManager(replicationConfig(cfg), metrics,
QuotaType.LEADER_REPLICATION, time),
- new ReplicationQuotaManager(replicationConfig(cfg), metrics,
QuotaType.FOLLOWER_REPLICATION, time),
- new ReplicationQuotaManager(alterLogDirsReplicationConfig(cfg), metrics,
QuotaType.ALTER_LOG_DIRS_REPLICATION, time),
- clientQuotaCallback
- )
- }
-
- def clientConfig(cfg: KafkaConfig): ClientQuotaManagerConfig = {
- new ClientQuotaManagerConfig(
- cfg.quotaConfig.numQuotaSamples,
- cfg.quotaConfig.quotaWindowSizeSeconds
- )
- }
-
- private def clientControllerMutationConfig(cfg: KafkaConfig):
ClientQuotaManagerConfig = {
- new ClientQuotaManagerConfig(
- cfg.quotaConfig.numControllerQuotaSamples,
- cfg.quotaConfig.controllerQuotaWindowSizeSeconds
- )
- }
-
- private def replicationConfig(cfg: KafkaConfig):
ReplicationQuotaManagerConfig = {
- new ReplicationQuotaManagerConfig(
- cfg.quotaConfig.numReplicationQuotaSamples,
- cfg.quotaConfig.replicationQuotaWindowSizeSeconds
- )
- }
-
- private def alterLogDirsReplicationConfig(cfg: KafkaConfig):
ReplicationQuotaManagerConfig = {
- new ReplicationQuotaManagerConfig(
- cfg.quotaConfig.numAlterLogDirsReplicationQuotaSamples,
- cfg.quotaConfig.alterLogDirsReplicationQuotaWindowSizeSeconds
- )
- }
-
-}
diff --git a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
deleted file mode 100644
index 6ae4b5aa284..00000000000
--- a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server
-
-import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
-import java.util.concurrent.locks.ReentrantReadWriteLock
-import scala.collection.Seq
-import kafka.server.Constants._
-import kafka.utils.CoreUtils._
-import kafka.utils.Logging
-import org.apache.kafka.common.metrics._
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.metrics.stats.SimpleRate
-import org.apache.kafka.common.utils.Time
-import org.apache.kafka.server.config.ReplicationQuotaManagerConfig
-import org.apache.kafka.server.quota.{QuotaType, SensorAccess}
-
-trait ReplicaQuota {
- def record(value: Long): Unit
- def isThrottled(topicPartition: TopicPartition): Boolean
- def isQuotaExceeded: Boolean
-}
-
-object Constants {
- val AllReplicas = Seq[Int](-1)
-}
-
-/**
- * Tracks replication metrics and comparing them to any quotas for throttled
partitions.
- *
- * @param config The quota configs
- * @param metrics The Metrics instance
- * @param replicationType The name / key for this quota manager, typically
leader or follower
- * @param time Time object to use
- */
-class ReplicationQuotaManager(val config: ReplicationQuotaManagerConfig,
- private val metrics: Metrics,
- private val replicationType: QuotaType,
- private val time: Time) extends Logging with
ReplicaQuota {
- private val lock = new ReentrantReadWriteLock()
- private val throttledPartitions = new ConcurrentHashMap[String, Seq[Int]]()
- private var quota: Quota = _
- private val sensorAccess = new SensorAccess(lock, metrics)
- private val rateMetricName = metrics.metricName("byte-rate",
replicationType.toString,
- s"Tracking byte-rate for $replicationType")
-
- /**
- * Update the quota
- *
- * @param quota
- */
- def updateQuota(quota: Quota): Unit = {
- inWriteLock(lock) {
- this.quota = quota
- //The metric could be expired by another thread, so use a local variable
and null check.
- val metric = metrics.metrics.get(rateMetricName)
- if (metric != null) {
- metric.config(getQuotaMetricConfig(quota))
- }
- }
- }
-
- /**
- * Check if the quota is currently exceeded
- *
- * @return
- */
- override def isQuotaExceeded: Boolean = {
- try {
- sensor().checkQuotas()
- } catch {
- case qve: QuotaViolationException =>
- trace(s"$replicationType: Quota violated for sensor
(${sensor().name}), metric: (${qve.metric.metricName}), " +
- s"metric-value: (${qve.value}), bound: (${qve.bound})")
- return true
- }
- false
- }
-
- /**
- * Is the passed partition throttled by this ReplicationQuotaManager
- *
- * @param topicPartition the partition to check
- * @return
- */
- override def isThrottled(topicPartition: TopicPartition): Boolean = {
- val partitions = throttledPartitions.get(topicPartition.topic)
- if (partitions != null)
- (partitions eq AllReplicas) ||
partitions.contains(topicPartition.partition)
- else false
- }
-
- /**
- * Add the passed value to the throttled rate. This method ignores the
quota with
- * the value being added to the rate even if the quota is exceeded
- *
- * @param value
- */
- def record(value: Long): Unit = {
- sensor().record(value.toDouble, time.milliseconds(), false)
- }
-
- /**
- * Update the set of throttled partitions for this QuotaManager. The
partitions passed, for
- * any single topic, will replace any previous
- *
- * @param topic
- * @param partitions the set of throttled partitions
- * @return
- */
- def markThrottled(topic: String, partitions: Seq[Int]): Unit = {
- throttledPartitions.put(topic, partitions)
- }
-
- /**
- * Mark all replicas for this topic as throttled
- *
- * @param topic
- * @return
- */
- def markThrottled(topic: String): Unit = {
- markThrottled(topic, AllReplicas)
- }
-
- /**
- * Remove list of throttled replicas for a certain topic
- *
- * @param topic
- * @return
- */
- def removeThrottle(topic: String): Unit = {
- throttledPartitions.remove(topic)
- }
-
- /**
- * Returns the bound of the configured quota
- *
- * @return
- */
- def upperBound: Long = {
- inReadLock(lock) {
- if (quota != null)
- quota.bound.toLong
- else
- Long.MaxValue
- }
- }
-
- private def getQuotaMetricConfig(quota: Quota): MetricConfig = {
- new MetricConfig()
- .timeWindow(config.quotaWindowSizeSeconds, TimeUnit.SECONDS)
- .samples(config.numQuotaSamples)
- .quota(quota)
- }
-
- private def sensor(): Sensor = {
- sensorAccess.getOrCreate(
- replicationType.toString,
- ReplicationQuotaManagerConfig.INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS,
- sensor => sensor.add(rateMetricName, new SimpleRate,
getQuotaMetricConfig(quota))
- )
- }
-}
diff --git a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
index cbb8ef3866d..6832b3a0cf7 100644
--- a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
+++ b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
@@ -80,7 +80,7 @@ class LocalLeaderEndPointTest extends Logging {
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
replicaManager.getPartitionOrException(topicPartition)
.localLogOrException
- endPoint = new LocalLeaderEndPoint(sourceBroker, config, replicaManager,
QuotaFactory.UnboundedQuota)
+ endPoint = new LocalLeaderEndPoint(sourceBroker, config, replicaManager,
QuotaFactory.UNBOUNDED_QUOTA)
}
@AfterEach
diff --git a/core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala
b/core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala
index eaffe282d36..514ca0a1b28 100644
--- a/core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala
+++ b/core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala
@@ -65,7 +65,7 @@ class RemoteLeaderEndPointTest {
blockingSend = new MockBlockingSender(offsets = new
util.HashMap[TopicPartition, EpochEndOffset](),
sourceBroker = sourceBroker, time = time)
endPoint = new RemoteLeaderEndPoint(logPrefix, blockingSend,
fetchSessionHandler,
- config, replicaManager, QuotaFactory.UnboundedQuota, () =>
MetadataVersion.MINIMUM_KRAFT_VERSION, () => currentBrokerEpoch)
+ config, replicaManager, QuotaFactory.UNBOUNDED_QUOTA, () =>
MetadataVersion.MINIMUM_KRAFT_VERSION, () => currentBrokerEpoch)
}
@Test
diff --git
a/core/src/test/scala/unit/kafka/server/ClientRequestQuotaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ClientRequestQuotaManagerTest.scala
index ec09f3a7d6d..2b5471653d9 100644
--- a/core/src/test/scala/unit/kafka/server/ClientRequestQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientRequestQuotaManagerTest.scala
@@ -22,15 +22,17 @@ import org.apache.kafka.server.quota.QuotaType
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
+import java.util.Optional
+
class ClientRequestQuotaManagerTest extends BaseClientQuotaManagerTest {
private val config = new ClientQuotaManagerConfig()
@Test
def testRequestPercentageQuotaViolation(): Unit = {
- val clientRequestQuotaManager = new ClientRequestQuotaManager(config,
metrics, time, "", None)
+ val clientRequestQuotaManager = new ClientRequestQuotaManager(config,
metrics, time, "", Optional.empty())
clientRequestQuotaManager.updateQuota(Some("ANONYMOUS"),
Some("test-client"), Some("test-client"), Some(Quota.upperBound(1)))
val queueSizeMetric =
metrics.metrics().get(metrics.metricName("queue-size",
QuotaType.REQUEST.toString, ""))
- def millisToPercent(millis: Double) = millis * 1000 * 1000 *
ClientRequestQuotaManager.NanosToPercentagePerSecond
+ def millisToPercent(millis: Double) = millis * 1000 * 1000 *
ClientRequestQuotaManager.NANOS_TO_PERCENTAGE_PER_SECOND
try {
// We have 10 second windows. Make sure that there is no quota violation
// if we are under the quota
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index c8ead4aef57..ec09e10840d 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -73,7 +73,7 @@ import java.util
import java.util.Collections.{singleton, singletonList, singletonMap}
import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit}
import java.util.concurrent.atomic.AtomicReference
-import java.util.{Collections, Properties}
+import java.util.{Collections, Optional, Properties}
import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
@@ -127,7 +127,7 @@ class ControllerApisTest {
private val raftManager: RaftManager[ApiMessageAndVersion] =
mock(classOf[RaftManager[ApiMessageAndVersion]])
private val metadataCache: KRaftMetadataCache =
MetadataCache.kRaftMetadataCache(0, () => KRaftVersion.KRAFT_VERSION_0)
- private val quotasNeverThrottleControllerMutations = QuotaManagers(
+ private val quotasNeverThrottleControllerMutations = new QuotaManagers(
clientQuotaManager,
clientQuotaManager,
clientRequestQuotaManager,
@@ -135,9 +135,9 @@ class ControllerApisTest {
replicaQuotaManager,
replicaQuotaManager,
replicaQuotaManager,
- None)
+ Optional.empty())
- private val quotasAlwaysThrottleControllerMutations = QuotaManagers(
+ private val quotasAlwaysThrottleControllerMutations = new QuotaManagers(
clientQuotaManager,
clientQuotaManager,
clientRequestQuotaManager,
@@ -145,7 +145,7 @@ class ControllerApisTest {
replicaQuotaManager,
replicaQuotaManager,
replicaQuotaManager,
- None)
+ Optional.empty())
private var controllerApis: ControllerApis = _
diff --git
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 4eea92330c9..768be4786bd 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -18,7 +18,7 @@
package kafka.server
import java.{lang, util}
-import java.util.{Properties, Map => JMap}
+import java.util.{Optional, Properties, Map => JMap}
import java.util.concurrent.{CompletionStage, TimeUnit}
import java.util.concurrent.atomic.AtomicReference
import kafka.controller.KafkaController
@@ -490,7 +490,7 @@ class DynamicBrokerConfigTest {
val metrics: Metrics = mock(classOf[Metrics])
when(kafkaServer.metrics).thenReturn(metrics)
val quotaManagers: QuotaFactory.QuotaManagers =
mock(classOf[QuotaFactory.QuotaManagers])
- when(quotaManagers.clientQuotaCallback).thenReturn(None)
+ when(quotaManagers.clientQuotaCallback).thenReturn(Optional.empty())
when(kafkaServer.quotaManagers).thenReturn(quotaManagers)
val socketServer: SocketServer = mock(classOf[SocketServer])
when(socketServer.reconfigurableConfigs).thenReturn(SocketServer.ReconfigurableConfigs)
@@ -537,7 +537,7 @@ class DynamicBrokerConfigTest {
val metrics: Metrics = mock(classOf[Metrics])
when(controllerServer.metrics).thenReturn(metrics)
val quotaManagers: QuotaFactory.QuotaManagers =
mock(classOf[QuotaFactory.QuotaManagers])
- when(quotaManagers.clientQuotaCallback).thenReturn(None)
+ when(quotaManagers.clientQuotaCallback).thenReturn(Optional.empty())
when(controllerServer.quotaManagers).thenReturn(quotaManagers)
val socketServer: SocketServer = mock(classOf[SocketServer])
when(socketServer.reconfigurableConfigs).thenReturn(SocketServer.ReconfigurableConfigs)
@@ -583,7 +583,7 @@ class DynamicBrokerConfigTest {
val metrics: Metrics = mock(classOf[Metrics])
when(controllerServer.metrics).thenReturn(metrics)
val quotaManagers: QuotaFactory.QuotaManagers =
mock(classOf[QuotaFactory.QuotaManagers])
- when(quotaManagers.clientQuotaCallback).thenReturn(None)
+ when(quotaManagers.clientQuotaCallback).thenReturn(Optional.empty())
when(controllerServer.quotaManagers).thenReturn(quotaManagers)
val socketServer: SocketServer = mock(classOf[SocketServer])
when(socketServer.reconfigurableConfigs).thenReturn(SocketServer.ReconfigurableConfigs)
diff --git
a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index a0687cbbcec..971cf0595be 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -20,7 +20,6 @@ import kafka.cluster.Partition
import kafka.integration.KafkaServerTestHarness
import kafka.log.UnifiedLog
import kafka.log.remote.RemoteLogManager
-import kafka.server.Constants._
import kafka.utils.TestUtils.random
import kafka.utils._
import kafka.zk.ConfigEntityChangeNotificationZNode
@@ -689,7 +688,7 @@ class DynamicConfigChangeUnitTest {
val result = configHandler.parseThrottledPartitions(props, 102,
QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG)
//Then
- assertEquals(AllReplicas, result)
+
assertEquals(ReplicationQuotaManager.ALL_REPLICAS.asScala.map(_.toInt).toSeq,
result)
}
@Test
@@ -700,7 +699,7 @@ class DynamicConfigChangeUnitTest {
102, QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG)
}
val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null,
null, null)
- assertEquals(AllReplicas, parse(configHandler, "* "))
+
assertEquals(ReplicationQuotaManager.ALL_REPLICAS.asScala.map(_.toInt).toSeq,
parse(configHandler, "* "))
assertEquals(Seq(), parse(configHandler, " "))
assertEquals(Seq(6), parse(configHandler, "6:102"))
assertEquals(Seq(6), parse(configHandler, "6:102 "))
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 3e4644140a3..d4515629454 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -144,8 +144,8 @@ class KafkaApisTest extends Logging {
private val clientRequestQuotaManager: ClientRequestQuotaManager =
mock(classOf[ClientRequestQuotaManager])
private val clientControllerQuotaManager: ControllerMutationQuotaManager =
mock(classOf[ControllerMutationQuotaManager])
private val replicaQuotaManager: ReplicationQuotaManager =
mock(classOf[ReplicationQuotaManager])
- private val quotas = QuotaManagers(clientQuotaManager, clientQuotaManager,
clientRequestQuotaManager,
- clientControllerQuotaManager, replicaQuotaManager, replicaQuotaManager,
replicaQuotaManager, None)
+ private val quotas = new QuotaManagers(clientQuotaManager,
clientQuotaManager, clientRequestQuotaManager,
+ clientControllerQuotaManager, replicaQuotaManager, replicaQuotaManager,
replicaQuotaManager, util.Optional.empty())
private val fetchManager: FetchManager = mock(classOf[FetchManager])
private val sharePartitionManager: SharePartitionManager =
mock(classOf[SharePartitionManager])
private val clientMetricsManager: ClientMetricsManager =
mock(classOf[ClientMetricsManager])
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index 47f10f3179e..98436915dbf 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -19,7 +19,7 @@ package kafka.server
import kafka.cluster.Partition
import kafka.log.{LogManager, UnifiedLog}
import kafka.server.AbstractFetcherThread.ResultWithPartitions
-import kafka.server.QuotaFactory.UnboundedQuota
+import kafka.server.QuotaFactory.UNBOUNDED_QUOTA
import kafka.server.ReplicaAlterLogDirsThread.ReassignmentState
import kafka.server.metadata.ZkMetadataCache
import kafka.utils.{DelayedItem, TestUtils}
@@ -531,7 +531,7 @@ class ReplicaAlterLogDirsThreadTest {
when(replicaManager.fetchMessages(
params = ArgumentMatchers.eq(expectedFetchParams),
fetchInfos = ArgumentMatchers.eq(Seq(topicIdPartition -> requestData)),
- quota = ArgumentMatchers.eq(UnboundedQuota),
+ quota = ArgumentMatchers.eq(UNBOUNDED_QUOTA),
responseCallback = callbackCaptor.capture(),
)).thenAnswer(_ => {
callbackCaptor.getValue.apply(Seq((topicIdPartition, responseData)))
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index fde5a9261bc..a398fc68cf0 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -19,7 +19,7 @@ package kafka.server
import kafka.cluster.Partition
import kafka.log.{LogManager, UnifiedLog}
import kafka.server.AbstractFetcherThread.ResultWithPartitions
-import kafka.server.QuotaFactory.UnboundedQuota
+import kafka.server.QuotaFactory.UNBOUNDED_QUOTA
import kafka.server.epoch.util.MockBlockingSender
import kafka.server.metadata.ZkMetadataCache
import kafka.utils.TestUtils
@@ -332,7 +332,7 @@ class ReplicaFetcherThreadTest {
config,
failedPartitions,
replicaManager,
- UnboundedQuota,
+ UNBOUNDED_QUOTA,
mockNetwork
)
thread.addPartitions(Map(t1p0 -> initialFetchState(Some(topicId1), 0L),
t1p1 -> initialFetchState(Some(topicId1), 0L)))
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
index 97370de9f9f..c4eb5e30f5b 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
@@ -263,7 +263,7 @@ class ReplicaManagerConcurrencyTest extends Logging {
replicaManager.fetchMessages(
params = fetchParams,
fetchInfos = Seq(topicIdPartition -> partitionData),
- quota = QuotaFactory.UnboundedQuota,
+ quota = QuotaFactory.UNBOUNDED_QUOTA,
responseCallback = fetchCallback,
)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index ddc9cac8d81..245e0063873 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -24,7 +24,7 @@ import kafka.log.{LogManager, LogManagerTest, UnifiedLog}
import kafka.log.remote.RemoteLogManager
import
org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig.INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS
import org.apache.kafka.server.log.remote.quota.RLMQuotaMetrics
-import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
+import kafka.server.QuotaFactory.{QuotaManagers, UNBOUNDED_QUOTA}
import kafka.server.epoch.util.MockBlockingSender
import kafka.server.share.DelayedShareFetch
import kafka.utils.TestUtils.waitUntilTrue
@@ -3311,7 +3311,7 @@ class ReplicaManagerTest {
maxWaitMs: Long = 0,
minBytes: Int = 1,
maxBytes: Int = 1024 * 1024,
- quota: ReplicaQuota = UnboundedQuota,
+ quota: ReplicaQuota = UNBOUNDED_QUOTA,
isolation: FetchIsolation = FetchIsolation.LOG_END,
clientMetadata: Option[ClientMetadata] = None
): Unit = {
@@ -4031,7 +4031,7 @@ class ReplicaManagerTest {
val params = new FetchParams(ApiKeys.FETCH.latestVersion, replicaId, 1,
1000, 0, 100, FetchIsolation.LOG_END, None.asJava)
// when reading log, it'll throw OffsetOutOfRangeException, which will
be handled separately
- val result = replicaManager.readFromLog(params, Seq(tidp0 -> new
PartitionData(topicId, 1, 0, 100000, Optional.of[Integer](leaderEpoch),
Optional.of[Integer](leaderEpoch))), UnboundedQuota, false)
+ val result = replicaManager.readFromLog(params, Seq(tidp0 -> new
PartitionData(topicId, 1, 0, 100000, Optional.of[Integer](leaderEpoch),
Optional.of[Integer](leaderEpoch))), UNBOUNDED_QUOTA, false)
if (isFromFollower) {
// expect OFFSET_MOVED_TO_TIERED_STORAGE error returned if it's from
follower, since the data is already available in remote log
@@ -4099,7 +4099,7 @@ class ReplicaManagerTest {
}
// when reading log, it'll throw OffsetOutOfRangeException, which will
be handled separately
- replicaManager.fetchMessages(params, Seq(tidp0 -> new
PartitionData(topicId, fetchOffset, 0, 100000,
Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))),
UnboundedQuota, fetchCallback)
+ replicaManager.fetchMessages(params, Seq(tidp0 -> new
PartitionData(topicId, fetchOffset, 0, 100000,
Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))),
UNBOUNDED_QUOTA, fetchCallback)
val remoteStorageFetchInfoArg: ArgumentCaptor[RemoteStorageFetchInfo] =
ArgumentCaptor.forClass(classOf[RemoteStorageFetchInfo])
if (isFromFollower) {
@@ -4198,7 +4198,7 @@ class ReplicaManagerTest {
// create 5 asyncRead tasks, which should enqueue 3 task
for (i <- 1 to 5)
- replicaManager.fetchMessages(params, Seq(tidp0 -> new
PartitionData(topicId, fetchOffset, 0, 100000,
Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))),
UnboundedQuota, fetchCallback)
+ replicaManager.fetchMessages(params, Seq(tidp0 -> new
PartitionData(topicId, fetchOffset, 0, 100000,
Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))),
UNBOUNDED_QUOTA, fetchCallback)
// wait until at least 2 task submitted to use all the available threads
queueLatch.await(5000, TimeUnit.MILLISECONDS)
@@ -4308,7 +4308,7 @@ class ReplicaManagerTest {
}).when(spyRLM).read(any())
val curExpiresPerSec =
DelayedRemoteFetchMetrics.expiredRequestMeter.count()
- replicaManager.fetchMessages(params, Seq(tidp0 -> new
PartitionData(topicId, fetchOffset, 0, 100000,
Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))),
UnboundedQuota, fetchCallback)
+ replicaManager.fetchMessages(params, Seq(tidp0 -> new
PartitionData(topicId, fetchOffset, 0, 100000,
Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))),
UNBOUNDED_QUOTA, fetchCallback)
// advancing the clock to expire the delayed remote fetch
timer.advanceClock(2000L)
@@ -6823,7 +6823,7 @@ class ReplicaManagerTest {
replicaManager.readFromLog(
params,
Seq(new TopicIdPartition(topicId, 0, topic) -> new
PartitionData(topicId, 1, 0, 100000, Optional.of[Integer](leaderEpoch),
Optional.of(leaderEpoch))),
- UnboundedQuota,
+ UNBOUNDED_QUOTA,
readFromPurgatory = false)
} finally {
replicaManager.shutdown(checkpointHW = false)
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
index ff90883a4a4..acb5c22ec76 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
@@ -39,7 +39,7 @@ class ReplicationQuotaManagerTest {
@Test
def shouldThrottleOnlyDefinedReplicas(): Unit = {
val quota = new ReplicationQuotaManager(new
ReplicationQuotaManagerConfig(), metrics, QuotaType.FETCH, time)
- quota.markThrottled("topic1", Seq(1, 2, 3))
+ quota.markThrottled("topic1", java.util.List.of(1, 2, 3))
assertTrue(quota.isThrottled(tp1(1)))
assertTrue(quota.isThrottled(tp1(2)))
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
index e0c931eeef8..a6463cfd443 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
@@ -116,7 +116,7 @@ public class KRaftMetadataRequestBenchmark {
private final ReplicationQuotaManager replicaQuotaManager =
Mockito.mock(ReplicationQuotaManager.class);
private final QuotaFactory.QuotaManagers quotaManagers = new
QuotaFactory.QuotaManagers(clientQuotaManager,
clientQuotaManager, clientRequestQuotaManager,
controllerMutationQuotaManager, replicaQuotaManager,
- replicaQuotaManager, replicaQuotaManager, Option.empty());
+ replicaQuotaManager, replicaQuotaManager, Optional.empty());
private final FetchManager fetchManager = Mockito.mock(FetchManager.class);
private final BrokerTopicStats brokerTopicStats = new
BrokerTopicStats(false);
private final KafkaPrincipal principal = new
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user");