junrao commented on code in PR #15625:
URL: https://github.com/apache/kafka/pull/15625#discussion_r1589688863


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -181,7 +189,8 @@ public RemoteLogManager(RemoteLogManagerConfig rlmConfig,
                             Time time,
                             Function<TopicPartition, Optional<UnifiedLog>> 
fetchLog,
                             BiConsumer<TopicPartition, Long> 
updateRemoteLogStartOffset,
-                            BrokerTopicStats brokerTopicStats) throws 
IOException {
+                            BrokerTopicStats brokerTopicStats,
+                            Metrics metrics) throws IOException {

Review Comment:
   Could we add the javadoc for the new param?



##########
core/src/main/java/kafka/log/remote/quota/RLMQuotaManagerConfig.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote.quota;
+
+public class RLMQuotaManagerConfig {
+    public static final int INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS = 3600;
+
+    private final long quotaBytesPerSecond;
+    private final int numQuotaSamples;
+    private final int quotaWindowSizeSeconds;
+
+    public long getQuotaBytesPerSecond() {

Review Comment:
   For consistency, we don't typically use getters. So this can just be 
quotaBytesPerSecond. Ditto below.



##########
core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote.quota;
+
+import kafka.server.QuotaType;
+import kafka.server.SensorAccess;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Quota;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.BoxedUnit;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class RLMQuotaManager {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(RLMQuotaManager.class);
+
+    private final RLMQuotaManagerConfig config;
+    private final Metrics metrics;
+    private final QuotaType quotaType;
+    private final String description;
+    private final Time time;
+
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    private final SensorAccess sensorAccess;
+    private Quota quota;
+
+    public RLMQuotaManager(RLMQuotaManagerConfig config, Metrics metrics, 
QuotaType quotaType, String description, Time time) {
+        this.config = config;
+        this.metrics = metrics;
+        this.quotaType = quotaType;
+        this.description = description;
+        this.time = time;
+
+        this.quota = new Quota(config.getQuotaBytesPerSecond(), true);
+        this.sensorAccess = new SensorAccess(lock, metrics);
+    }
+
+    public void updateQuota(Quota newQuota) {
+        lock.writeLock().lock();
+        try {
+            this.quota = newQuota;
+
+            Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
+            MetricName quotaMetricName = metricName();
+            KafkaMetric metric = allMetrics.get(quotaMetricName);
+            if (metric != null) {
+                LOGGER.warn("Sensor for quota-id {} already exists. Setting 
quota to {} in MetricConfig", quotaMetricName, newQuota);
+                metric.config(getQuotaMetricConfig(newQuota));
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    public boolean isQuotaExceeded() {
+        Sensor sensorInstance = sensor();
+        try {
+            sensorInstance.checkQuotas();
+        } catch (QuotaViolationException qve) {
+            LOGGER.debug("Quota violated for sensor ({}), metric: ({}), 
metric-value: ({}), bound: ({})",
+                sensorInstance.name(), qve.metric().metricName(), qve.value(), 
qve.bound());
+            return true;
+        }
+        return false;
+    }
+
+    public void record(double value) {
+        sensor().record(value, time.milliseconds(), false);

Review Comment:
   Why do we turn the quota checking off during `record()`? In other 
implementations like `ClientQuotaManager.recordAndGetThrottleTimeMs()`, we call 
`record()` by turning on quota checking and get back the amount of time to 
throttle.



##########
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java:
##########
@@ -143,6 +143,38 @@ public final class RemoteLogManagerConfig {
             "less than or equal to `log.retention.bytes` value.";
     public static final Long DEFAULT_LOG_LOCAL_RETENTION_BYTES = -2L;
 
+    public static final String 
REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP = 
"remote.log.manager.copy.max.bytes.per.second";
+    public static final String 
REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_DOC = "The maximum number of bytes 
that can be copied from local storage to remote storage per second. " +
+            "This is a global limit for all the partitions that are being 
copied from remote storage to local storage. " +
+            "The default value is Long.MAX_VALUE, which means there is no 
limit on the number of bytes that can be copied per second.";
+    public static final Long 
DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND = Long.MAX_VALUE;
+
+    public static final String REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP = 
"remote.log.manager.copy.quota.window.num";
+    public static final String REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_DOC = 
"The number of samples to retain in memory for remote copy quota management. " +
+            "The default value is 61, which means there are 60 whole windows + 
1 current window.";
+    public static final int DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM = 
61;
+
+    public static final String 
REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP = 
"remote.log.manager.copy.quota.window.size.seconds";
+    public static final String 
REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_DOC = "The time span of each 
sample for remote copy quota management. " +
+            "The default value is 1 second.";
+    public static final int 
DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS = 1;
+
+    public static final String 
REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP = 
"remote.log.manager.fetch.max.bytes.per.second";
+    public static final String 
REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_DOC = "The maximum number of 
bytes that can be fetched from remote storage to local storage per second. " +
+            "This is a global limit for all the partitions that are being 
fetched from remote storage to local storage. " +
+            "The default value is Long.MAX_VALUE, which means there is no 
limit on the number of bytes that can be fetched per second.";
+    public static final Long 
DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND = Long.MAX_VALUE;
+
+    public static final String REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP 
= "remote.log.manager.fetch.quota.window.num";
+    public static final String REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_DOC = 
"The number of samples to retain in memory for remote fetch quota management. " 
+
+            "The default value is 11, which means there are 10 whole windows + 
1 current window.";
+    public static final int DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM 
= 11;

Review Comment:
   Why do we choose the default fetch window different from copy?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to