showuon commented on code in PR #15625:
URL: https://github.com/apache/kafka/pull/15625#discussion_r1588939433


##########
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java:
##########
@@ -143,6 +143,38 @@ public final class RemoteLogManagerConfig {
             "less than or equal to `log.retention.bytes` value.";
     public static final Long DEFAULT_LOG_LOCAL_RETENTION_BYTES = -2L;
 
+    public static final String 
REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP = 
"remote.log.manager.copy.max.bytes.per.second";
+    public static final String 
REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_DOC = "The maximum number of bytes 
that can be copied from local storage to remote storage per second. " +
+            "This is a global limit for all the partitions that are being 
copied from remote storage to local storage. " +

Review Comment:
   `This is a global limit for all the partitions that are being copied from 
remote storage to local storage.` <-- is it right? Copied from local storage to 
remote storage?



##########
core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote.quota;
+
+import kafka.server.QuotaType;
+import kafka.server.SensorAccess;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Quota;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.BoxedUnit;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class RLMQuotaManager {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(RLMQuotaManager.class);
+
+    private final RLMQuotaManagerConfig config;
+    private final Metrics metrics;
+    private final QuotaType quotaType;
+    private final String description;
+    private final Time time;
+
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    private final SensorAccess sensorAccess;
+    private Quota quota;
+
+    public RLMQuotaManager(RLMQuotaManagerConfig config, Metrics metrics, 
QuotaType quotaType, String description, Time time) {
+        this.config = config;
+        this.metrics = metrics;
+        this.quotaType = quotaType;
+        this.description = description;
+        this.time = time;
+
+        this.quota = new Quota(config.getQuotaBytesPerSecond(), true);
+        this.sensorAccess = new SensorAccess(lock, metrics);
+    }
+
+    public void updateQuota(Quota newQuota) {
+        lock.writeLock().lock();
+        try {
+            this.quota = newQuota;
+
+            Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
+            MetricName quotaMetricName = metricName();
+            KafkaMetric metric = allMetrics.get(quotaMetricName);
+            if (metric != null) {
+                LOGGER.warn("Sensor for quota-id {} already exists. Setting 
quota to {} in MetricConfig", quotaMetricName, newQuota);

Review Comment:
   I'd like to know why we set WARN logs here. It looks to me if we want to 
update quota dynamically, it is expected the metric is already existed, right? 
If so, I don't think this is unexpected. So maybe INFO or DEBUG level, WDYT?



##########
core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote.quota;
+
+import kafka.server.QuotaType;
+import kafka.server.SensorAccess;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Quota;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.BoxedUnit;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class RLMQuotaManager {

Review Comment:
   I can see this class is quite similar with `ReplicationQuotaManager`. Could 
you please create a JIRA issue to see if we could refactor them to avoid 
duplication in the future? Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to