Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
gaobosince1987 commented on PR #15625: URL: https://github.com/apache/kafka/pull/15625#issuecomment-2243384804 @abhijeetk88 trying the quota feature in test cluster now from trunk, primarily to save CPU when multiple consumer pull at the same time. Is there a document related with how to set quota for remote store? Latest trunk build does not seem to throttle remote fetch for me. Let me know if there is extra setup needed for this feature to work. I have a 6 broker test cluster, 1 test topic with local.retention.ms=1 so all read goes to remote. However consumer are still able to fetch 300MB per broker from remote. With the configuration, I would expect close to zero remote fetch. [root@kafka-test-02-zookeeper-0 kafka]#./bin/kafka-configs.sh --bootstrap-server kafka-test-02-kafka-bootstrap:9092 --entity-type brokers --entity-default --alter --add-config 'remote.log.manager.fetch.max.bytes.per.second=1,remote.log.manager.copy.max.bytes.per.second=1428800' Completed updating default config for brokers in the cluster. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
abhijeetk88 commented on PR #15625: URL: https://github.com/apache/kafka/pull/15625#issuecomment-2141192632 Thanks @chia7712 @jolshan . Apologies for the miss. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
junrao commented on PR #15625: URL: https://github.com/apache/kafka/pull/15625#issuecomment-2140919323 @jolshan : Thanks for pointing this out. Sorry that I didn't look at the test results carefully before merging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
chia7712 commented on PR #15625: URL: https://github.com/apache/kafka/pull/15625#issuecomment-2140866041 @jolshan I file https://github.com/apache/kafka/pull/16146 to fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
jolshan commented on PR #15625: URL: https://github.com/apache/kafka/pull/15625#issuecomment-2140804438 Can we look at testCopyQuotaManagerConfig() – kafka.log.remote.RemoteLogManagerTest? It seems like it is failing pretty consistently. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
junrao merged PR #15625: URL: https://github.com/apache/kafka/pull/15625 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
abhijeetk88 commented on code in PR #15625: URL: https://github.com/apache/kafka/pull/15625#discussion_r1619973284 ## storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java: ## @@ -143,6 +143,38 @@ public final class RemoteLogManagerConfig { "less than or equal to `log.retention.bytes` value."; public static final Long DEFAULT_LOG_LOCAL_RETENTION_BYTES = -2L; +public static final String REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP = "remote.log.manager.copy.max.bytes.per.second"; +public static final String REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_DOC = "The maximum number of bytes that can be copied from local storage to remote storage per second. " + +"This is a global limit for all the partitions that are being copied from remote storage to local storage. " + +"The default value is Long.MAX_VALUE, which means there is no limit on the number of bytes that can be copied per second."; +public static final Long DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND = Long.MAX_VALUE; + +public static final String REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP = "remote.log.manager.copy.quota.window.num"; +public static final String REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_DOC = "The number of samples to retain in memory for remote copy quota management. " + +"The default value is 61, which means there are 60 whole windows + 1 current window."; +public static final int DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM = 61; + +public static final String REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP = "remote.log.manager.copy.quota.window.size.seconds"; +public static final String REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_DOC = "The time span of each sample for remote copy quota management. " + +"The default value is 1 second."; +public static final int DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS = 1; + +public static final String REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP = "remote.log.manager.fetch.max.bytes.per.second"; +public static final String REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_DOC = "The maximum number of bytes that can be fetched from remote storage to local storage per second. " + +"This is a global limit for all the partitions that are being fetched from remote storage to local storage. " + +"The default value is Long.MAX_VALUE, which means there is no limit on the number of bytes that can be fetched per second."; +public static final Long DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND = Long.MAX_VALUE; + +public static final String REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP = "remote.log.manager.fetch.quota.window.num"; +public static final String REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_DOC = "The number of samples to retain in memory for remote fetch quota management. " + +"The default value is 11, which means there are 10 whole windows + 1 current window."; +public static final int DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM = 11; Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
junrao commented on code in PR #15625: URL: https://github.com/apache/kafka/pull/15625#discussion_r1619445831 ## storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java: ## @@ -143,6 +143,38 @@ public final class RemoteLogManagerConfig { "less than or equal to `log.retention.bytes` value."; public static final Long DEFAULT_LOG_LOCAL_RETENTION_BYTES = -2L; +public static final String REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP = "remote.log.manager.copy.max.bytes.per.second"; +public static final String REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_DOC = "The maximum number of bytes that can be copied from local storage to remote storage per second. " + +"This is a global limit for all the partitions that are being copied from remote storage to local storage. " + +"The default value is Long.MAX_VALUE, which means there is no limit on the number of bytes that can be copied per second."; +public static final Long DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND = Long.MAX_VALUE; + +public static final String REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP = "remote.log.manager.copy.quota.window.num"; +public static final String REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_DOC = "The number of samples to retain in memory for remote copy quota management. " + +"The default value is 61, which means there are 60 whole windows + 1 current window."; +public static final int DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM = 61; + +public static final String REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP = "remote.log.manager.copy.quota.window.size.seconds"; +public static final String REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_DOC = "The time span of each sample for remote copy quota management. " + +"The default value is 1 second."; +public static final int DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS = 1; + +public static final String REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP = "remote.log.manager.fetch.max.bytes.per.second"; +public static final String REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_DOC = "The maximum number of bytes that can be fetched from remote storage to local storage per second. " + +"This is a global limit for all the partitions that are being fetched from remote storage to local storage. " + +"The default value is Long.MAX_VALUE, which means there is no limit on the number of bytes that can be fetched per second."; +public static final Long DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND = Long.MAX_VALUE; + +public static final String REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP = "remote.log.manager.fetch.quota.window.num"; +public static final String REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_DOC = "The number of samples to retain in memory for remote fetch quota management. " + +"The default value is 11, which means there are 10 whole windows + 1 current window."; +public static final int DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM = 11; Review Comment: If there is no good reach, perhaps it's better to use the same default window number for copy too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
showuon commented on PR #15625: URL: https://github.com/apache/kafka/pull/15625#issuecomment-2137244557 @junrao , since this PR blocks other follow-up PRs and v3.8.0 release date is approaching, I'd like to merge it tomorrow if you don't have any other comments. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
abhijeetk88 commented on PR #15625: URL: https://github.com/apache/kafka/pull/15625#issuecomment-2136524934 > @abhijeetk88 , there is a merge conflict. Please help resolve it. Thanks. done > Do we plan to change the default copy quota window num samples from 61 to 11? Waiting for a confirmation from Jun. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
abhijeetk88 commented on code in PR #15625: URL: https://github.com/apache/kafka/pull/15625#discussion_r1618186590 ## core/src/main/java/kafka/log/remote/quota/RLMQuotaManagerConfig.java: ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote.quota; + +public class RLMQuotaManagerConfig { +public static final int INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS = 3600; + +private final long quotaBytesPerSecond; +private final int numQuotaSamples; +private final int quotaWindowSizeSeconds; + +public long quotaBytesPerSecond() { +return quotaBytesPerSecond; +} + +public int numQuotaSamples() { +return numQuotaSamples; +} + +public int quotaWindowSizeSeconds() { +return quotaWindowSizeSeconds; +} + +/** + * Configuration settings for quota management + * + * @param quotaBytesPerSecondThe quota in bytes per second + * @param numQuotaSamplesThe number of samples to retain in memory + * @param quotaWindowSizeSeconds The time span of each sample + */ +public RLMQuotaManagerConfig(long quotaBytesPerSecond, int numQuotaSamples, int quotaWindowSizeSeconds) { Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
showuon commented on PR #15625: URL: https://github.com/apache/kafka/pull/15625#issuecomment-2136406740 @abhijeetk88 , there is a merge conflict. Please help resolve it. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
kamalcph commented on code in PR #15625: URL: https://github.com/apache/kafka/pull/15625#discussion_r1617015938 ## core/src/main/java/kafka/log/remote/quota/RLMQuotaManagerConfig.java: ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote.quota; + +public class RLMQuotaManagerConfig { +public static final int INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS = 3600; + +private final long quotaBytesPerSecond; +private final int numQuotaSamples; +private final int quotaWindowSizeSeconds; + +public long quotaBytesPerSecond() { +return quotaBytesPerSecond; +} + +public int numQuotaSamples() { +return numQuotaSamples; +} + +public int quotaWindowSizeSeconds() { +return quotaWindowSizeSeconds; +} + +/** + * Configuration settings for quota management + * + * @param quotaBytesPerSecondThe quota in bytes per second + * @param numQuotaSamplesThe number of samples to retain in memory + * @param quotaWindowSizeSeconds The time span of each sample + */ +public RLMQuotaManagerConfig(long quotaBytesPerSecond, int numQuotaSamples, int quotaWindowSizeSeconds) { Review Comment: nit: place constructor before the getter methods -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
abhijeetk88 commented on PR #15625: URL: https://github.com/apache/kafka/pull/15625#issuecomment-2132600646 @showuon @junrao I have addressed your comments. Please take a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
abhijeetk88 commented on code in PR #15625: URL: https://github.com/apache/kafka/pull/15625#discussion_r1613033249 ## storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java: ## @@ -143,6 +143,38 @@ public final class RemoteLogManagerConfig { "less than or equal to `log.retention.bytes` value."; public static final Long DEFAULT_LOG_LOCAL_RETENTION_BYTES = -2L; +public static final String REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP = "remote.log.manager.copy.max.bytes.per.second"; +public static final String REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_DOC = "The maximum number of bytes that can be copied from local storage to remote storage per second. " + +"This is a global limit for all the partitions that are being copied from remote storage to local storage. " + +"The default value is Long.MAX_VALUE, which means there is no limit on the number of bytes that can be copied per second."; +public static final Long DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND = Long.MAX_VALUE; + +public static final String REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP = "remote.log.manager.copy.quota.window.num"; +public static final String REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_DOC = "The number of samples to retain in memory for remote copy quota management. " + +"The default value is 61, which means there are 60 whole windows + 1 current window."; +public static final int DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM = 61; + +public static final String REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP = "remote.log.manager.copy.quota.window.size.seconds"; +public static final String REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_DOC = "The time span of each sample for remote copy quota management. " + +"The default value is 1 second."; +public static final int DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS = 1; + +public static final String REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP = "remote.log.manager.fetch.max.bytes.per.second"; +public static final String REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_DOC = "The maximum number of bytes that can be fetched from remote storage to local storage per second. " + +"This is a global limit for all the partitions that are being fetched from remote storage to local storage. " + +"The default value is Long.MAX_VALUE, which means there is no limit on the number of bytes that can be fetched per second."; +public static final Long DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND = Long.MAX_VALUE; + +public static final String REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP = "remote.log.manager.fetch.quota.window.num"; +public static final String REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_DOC = "The number of samples to retain in memory for remote fetch quota management. " + +"The default value is 11, which means there are 10 whole windows + 1 current window."; +public static final int DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM = 11; Review Comment: For fetches, the default window size was chosen to match the default window size used for other quotas, such as ClientQuota and ReplicationQuota. Using an 11-second (10 whole + 1 current) window size for copies, similar to other quotas, does seem to be a better option. Consider this: The broker-level quota for copying may be set to 250 MBps. The RLM task records the log segment size with the quota manager when uploading a log segment. The typical log segment size is 500 MB, meaning only one log segment can be uploaded every 2 seconds without breaching the quota. If uploads occur faster, the quota will be exceeded. Therefore, as long as the window size is greater than 2 seconds, either a 10-second or 60-second (whole) window should work. However, a shorter window (10 seconds) has advantages. It tracks data uploads more precisely and prevents large spikes in data upload more effectively. For example: With a 10-second window: Buckets: b1, b2, ..., b10 In the 10th second, 5 segments can be uploaded without breaching the average quota (5 * 500 MB / 10 seconds = 250 MBps), though the spike will be 2.5 GB in that second. With a 60-second window: Buckets: b1, b2, ..., b60 In the 60th second, 30 segments can be uploaded without breaching the average quota (30 * 500 MB / 60 seconds = 250 MBps), but the spike will be 15 GB in that second. Given the need to avoid quota breaches, a 10-second window is preferable to a 60-second window. Let me know if it makes sense. I can change the default copy window to be the same as the default fetch window. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
abhijeetk88 commented on code in PR #15625: URL: https://github.com/apache/kafka/pull/15625#discussion_r1612931258 ## core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java: ## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote.quota; + +import kafka.server.QuotaType; +import kafka.server.SensorAccess; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Quota; +import org.apache.kafka.common.metrics.QuotaViolationException; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.SimpleRate; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.runtime.BoxedUnit; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class RLMQuotaManager { +private static final Logger LOGGER = LoggerFactory.getLogger(RLMQuotaManager.class); + +private final RLMQuotaManagerConfig config; +private final Metrics metrics; +private final QuotaType quotaType; +private final String description; +private final Time time; + +private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); +private final SensorAccess sensorAccess; +private Quota quota; + +public RLMQuotaManager(RLMQuotaManagerConfig config, Metrics metrics, QuotaType quotaType, String description, Time time) { +this.config = config; +this.metrics = metrics; +this.quotaType = quotaType; +this.description = description; +this.time = time; + +this.quota = new Quota(config.getQuotaBytesPerSecond(), true); +this.sensorAccess = new SensorAccess(lock, metrics); +} + +public void updateQuota(Quota newQuota) { +lock.writeLock().lock(); +try { +this.quota = newQuota; + +Map allMetrics = metrics.metrics(); +MetricName quotaMetricName = metricName(); +KafkaMetric metric = allMetrics.get(quotaMetricName); +if (metric != null) { +LOGGER.warn("Sensor for quota-id {} already exists. Setting quota to {} in MetricConfig", quotaMetricName, newQuota); +metric.config(getQuotaMetricConfig(newQuota)); +} +} finally { +lock.writeLock().unlock(); +} +} + +public boolean isQuotaExceeded() { +Sensor sensorInstance = sensor(); +try { +sensorInstance.checkQuotas(); +} catch (QuotaViolationException qve) { +LOGGER.debug("Quota violated for sensor ({}), metric: ({}), metric-value: ({}), bound: ({})", +sensorInstance.name(), qve.metric().metricName(), qve.value(), qve.bound()); +return true; +} +return false; +} + +public void record(double value) { +sensor().record(value, time.milliseconds(), false); Review Comment: In KIP-956, we do not utilize the throttle time provided by the quota manager to regulate fetches and copies. For fetch operations, we initially verify quota availability before initiating the retrieval of remote data. If the quota is unavailable, our priority is to serve partitions requiring local data, rather than throttling the client. Therefore, we focus on fulfilling data requests for other partitions in the queue, eliminating the need for throttle time in fetch operations. Similarly, when a RLM Task attempts to copy a segment, it first checks if the write quota is available. If the quota is not available, the thread waits until the quota becomes available. As a result, we do not require throttle time for copies either. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service,
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
showuon commented on PR #15625: URL: https://github.com/apache/kafka/pull/15625#issuecomment-2126707276 @abhijeetk88 , do we have any update on this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
abhijeetk88 commented on code in PR #15625: URL: https://github.com/apache/kafka/pull/15625#discussion_r1597682003 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -181,7 +189,8 @@ public RemoteLogManager(RemoteLogManagerConfig rlmConfig, Time time, Function> fetchLog, BiConsumer updateRemoteLogStartOffset, -BrokerTopicStats brokerTopicStats) throws IOException { +BrokerTopicStats brokerTopicStats, +Metrics metrics) throws IOException { Review Comment: Sure, will add. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
abhijeetk88 commented on code in PR #15625: URL: https://github.com/apache/kafka/pull/15625#discussion_r1597667627 ## core/src/main/java/kafka/log/remote/quota/RLMQuotaManagerConfig.java: ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote.quota; + +public class RLMQuotaManagerConfig { +public static final int INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS = 3600; + +private final long quotaBytesPerSecond; +private final int numQuotaSamples; +private final int quotaWindowSizeSeconds; + +public long getQuotaBytesPerSecond() { Review Comment: Sure, will change these. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
abhijeetk88 commented on code in PR #15625: URL: https://github.com/apache/kafka/pull/15625#discussion_r1597667613 ## core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java: ## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote.quota; + +import kafka.server.QuotaType; +import kafka.server.SensorAccess; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Quota; +import org.apache.kafka.common.metrics.QuotaViolationException; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.SimpleRate; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.runtime.BoxedUnit; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class RLMQuotaManager { Review Comment: Created https://issues.apache.org/jira/browse/KAFKA-16706 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
abhijeetk88 commented on code in PR #15625: URL: https://github.com/apache/kafka/pull/15625#discussion_r1597666914 ## core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java: ## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote.quota; + +import kafka.server.QuotaType; +import kafka.server.SensorAccess; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Quota; +import org.apache.kafka.common.metrics.QuotaViolationException; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.SimpleRate; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.runtime.BoxedUnit; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class RLMQuotaManager { +private static final Logger LOGGER = LoggerFactory.getLogger(RLMQuotaManager.class); + +private final RLMQuotaManagerConfig config; +private final Metrics metrics; +private final QuotaType quotaType; +private final String description; +private final Time time; + +private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); +private final SensorAccess sensorAccess; +private Quota quota; + +public RLMQuotaManager(RLMQuotaManagerConfig config, Metrics metrics, QuotaType quotaType, String description, Time time) { +this.config = config; +this.metrics = metrics; +this.quotaType = quotaType; +this.description = description; +this.time = time; + +this.quota = new Quota(config.getQuotaBytesPerSecond(), true); +this.sensorAccess = new SensorAccess(lock, metrics); +} + +public void updateQuota(Quota newQuota) { +lock.writeLock().lock(); +try { +this.quota = newQuota; + +Map allMetrics = metrics.metrics(); +MetricName quotaMetricName = metricName(); +KafkaMetric metric = allMetrics.get(quotaMetricName); +if (metric != null) { +LOGGER.warn("Sensor for quota-id {} already exists. Setting quota to {} in MetricConfig", quotaMetricName, newQuota); Review Comment: On second thought, INFO should be the right level, because quota update is a significant change in the application state and will decide how fast copies/fetches from remote storage can happen. Also, quota updates are infrequent, hence it will not cause excessive logging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
abhijeetk88 commented on code in PR #15625: URL: https://github.com/apache/kafka/pull/15625#discussion_r1597664809 ## storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java: ## @@ -143,6 +143,38 @@ public final class RemoteLogManagerConfig { "less than or equal to `log.retention.bytes` value."; public static final Long DEFAULT_LOG_LOCAL_RETENTION_BYTES = -2L; +public static final String REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP = "remote.log.manager.copy.max.bytes.per.second"; +public static final String REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_DOC = "The maximum number of bytes that can be copied from local storage to remote storage per second. " + +"This is a global limit for all the partitions that are being copied from remote storage to local storage. " + Review Comment: My bad. Thanks for catching this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
junrao commented on code in PR #15625: URL: https://github.com/apache/kafka/pull/15625#discussion_r1589688863 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -181,7 +189,8 @@ public RemoteLogManager(RemoteLogManagerConfig rlmConfig, Time time, Function> fetchLog, BiConsumer updateRemoteLogStartOffset, -BrokerTopicStats brokerTopicStats) throws IOException { +BrokerTopicStats brokerTopicStats, +Metrics metrics) throws IOException { Review Comment: Could we add the javadoc for the new param? ## core/src/main/java/kafka/log/remote/quota/RLMQuotaManagerConfig.java: ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote.quota; + +public class RLMQuotaManagerConfig { +public static final int INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS = 3600; + +private final long quotaBytesPerSecond; +private final int numQuotaSamples; +private final int quotaWindowSizeSeconds; + +public long getQuotaBytesPerSecond() { Review Comment: For consistency, we don't typically use getters. So this can just be quotaBytesPerSecond. Ditto below. ## core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java: ## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote.quota; + +import kafka.server.QuotaType; +import kafka.server.SensorAccess; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Quota; +import org.apache.kafka.common.metrics.QuotaViolationException; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.SimpleRate; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.runtime.BoxedUnit; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class RLMQuotaManager { +private static final Logger LOGGER = LoggerFactory.getLogger(RLMQuotaManager.class); + +private final RLMQuotaManagerConfig config; +private final Metrics metrics; +private final QuotaType quotaType; +private final String description; +private final Time time; + +private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); +private final SensorAccess sensorAccess; +private Quota quota; + +public RLMQuotaManager(RLMQuotaManagerConfig config, Metrics metrics, QuotaType quotaType, String description, Time time) { +this.config = config; +this.metrics = metrics; +this.quotaType = quotaType; +this.description = description; +this.time = time; + +this.quota = new Quota(config.getQuotaBytesPerSecond(), true); +this.sensorAccess = new SensorAccess(lock, metrics); +} + +public void updateQuota(Quota newQuota) { +lock.writeLock().lock(); +try { +this.quota = newQuota; + +Map allMetrics = metrics.metrics(); +MetricName quotaMetricName = metricName(); +K
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
showuon commented on code in PR #15625: URL: https://github.com/apache/kafka/pull/15625#discussion_r1588939433 ## storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java: ## @@ -143,6 +143,38 @@ public final class RemoteLogManagerConfig { "less than or equal to `log.retention.bytes` value."; public static final Long DEFAULT_LOG_LOCAL_RETENTION_BYTES = -2L; +public static final String REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP = "remote.log.manager.copy.max.bytes.per.second"; +public static final String REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_DOC = "The maximum number of bytes that can be copied from local storage to remote storage per second. " + +"This is a global limit for all the partitions that are being copied from remote storage to local storage. " + Review Comment: `This is a global limit for all the partitions that are being copied from remote storage to local storage.` <-- is it right? Copied from local storage to remote storage? ## core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java: ## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote.quota; + +import kafka.server.QuotaType; +import kafka.server.SensorAccess; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Quota; +import org.apache.kafka.common.metrics.QuotaViolationException; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.SimpleRate; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.runtime.BoxedUnit; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class RLMQuotaManager { +private static final Logger LOGGER = LoggerFactory.getLogger(RLMQuotaManager.class); + +private final RLMQuotaManagerConfig config; +private final Metrics metrics; +private final QuotaType quotaType; +private final String description; +private final Time time; + +private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); +private final SensorAccess sensorAccess; +private Quota quota; + +public RLMQuotaManager(RLMQuotaManagerConfig config, Metrics metrics, QuotaType quotaType, String description, Time time) { +this.config = config; +this.metrics = metrics; +this.quotaType = quotaType; +this.description = description; +this.time = time; + +this.quota = new Quota(config.getQuotaBytesPerSecond(), true); +this.sensorAccess = new SensorAccess(lock, metrics); +} + +public void updateQuota(Quota newQuota) { +lock.writeLock().lock(); +try { +this.quota = newQuota; + +Map allMetrics = metrics.metrics(); +MetricName quotaMetricName = metricName(); +KafkaMetric metric = allMetrics.get(quotaMetricName); +if (metric != null) { +LOGGER.warn("Sensor for quota-id {} already exists. Setting quota to {} in MetricConfig", quotaMetricName, newQuota); Review Comment: I'd like to know why we set WARN logs here. It looks to me if we want to update quota dynamically, it is expected the metric is already existed, right? If so, I don't think this is unexpected. So maybe INFO or DEBUG level, WDYT? ## core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java: ## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *ht
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
abhijeetk88 commented on code in PR #15625: URL: https://github.com/apache/kafka/pull/15625#discussion_r1580557688 ## core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java: ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote.quota; + +import kafka.server.QuotaType; +import kafka.server.SensorAccess; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Quota; +import org.apache.kafka.common.metrics.QuotaViolationException; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.SimpleRate; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.runtime.BoxedUnit; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class RLMQuotaManager { +private static final Logger LOGGER = LoggerFactory.getLogger(RLMQuotaManager.class); + +private final RLMQuotaManagerConfig config; +private final Metrics metrics; +private final QuotaType quotaType; +private final String description; +private final Time time; + +private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); +private final SensorAccess sensorAccess; +private Quota quota; + +public RLMQuotaManager(RLMQuotaManagerConfig config, Metrics metrics, QuotaType quotaType, String description, Time time) { +this.config = config; +this.metrics = metrics; +this.quotaType = quotaType; +this.description = description; +this.time = time; + +this.quota = new Quota(config.getQuotaBytesPerSecond(), true); +this.sensorAccess = new SensorAccess(lock, metrics); +} + +public void updateQuota(Quota newQuota) { +lock.writeLock().lock(); +try { +this.quota = newQuota; + +Map allMetrics = metrics.metrics(); +MetricName quotaMetricName = metricName(); +allMetrics.forEach((metricName, metric) -> { +if (metricName.name().equals(quotaMetricName.name()) && metricName.group().equals(quotaMetricName.group())) { +Map metricTags = metricName.tags(); +LOGGER.info("Sensor for quota-id {} already exists. Setting quota to {} in MetricConfig", metricTags, newQuota); +metric.config(getQuotaMetricConfig(newQuota)); +} +}); +} finally { +lock.writeLock().unlock(); +} +} + +public boolean isQuotaExceeded() { +Sensor sensorInstance = sensor(); +try { +sensorInstance.checkQuotas(); +} catch (QuotaViolationException qve) { +LOGGER.debug("Quota violated for sensor ({}), metric: ({}), metric-value: ({}), bound: ({})", Review Comment: quota type is already being captured in the sensor instance name. So we can skip logging it separately. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
abhijeetk88 commented on code in PR #15625: URL: https://github.com/apache/kafka/pull/15625#discussion_r1580549399 ## core/src/main/java/kafka/log/remote/quota/RLMQuotaManagerConfig.java: ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote.quota; + +public class RLMQuotaManagerConfig { Review Comment: Added -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
abhijeetk88 commented on code in PR #15625: URL: https://github.com/apache/kafka/pull/15625#discussion_r1580545776 ## core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java: ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote.quota; + +import kafka.server.QuotaType; +import kafka.server.SensorAccess; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Quota; +import org.apache.kafka.common.metrics.QuotaViolationException; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.SimpleRate; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.runtime.BoxedUnit; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class RLMQuotaManager { +private static final Logger LOGGER = LoggerFactory.getLogger(RLMQuotaManager.class); + +private final RLMQuotaManagerConfig config; +private final Metrics metrics; +private final QuotaType quotaType; +private final String description; +private final Time time; + +private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); +private final SensorAccess sensorAccess; +private Quota quota; + +public RLMQuotaManager(RLMQuotaManagerConfig config, Metrics metrics, QuotaType quotaType, String description, Time time) { +this.config = config; +this.metrics = metrics; +this.quotaType = quotaType; +this.description = description; +this.time = time; + +this.quota = new Quota(config.getQuotaBytesPerSecond(), true); +this.sensorAccess = new SensorAccess(lock, metrics); +} + +public void updateQuota(Quota newQuota) { +lock.writeLock().lock(); +try { +this.quota = newQuota; + +Map allMetrics = metrics.metrics(); +MetricName quotaMetricName = metricName(); +allMetrics.forEach((metricName, metric) -> { +if (metricName.name().equals(quotaMetricName.name()) && metricName.group().equals(quotaMetricName.group())) { +Map metricTags = metricName.tags(); +LOGGER.info("Sensor for quota-id {} already exists. Setting quota to {} in MetricConfig", metricTags, newQuota); +metric.config(getQuotaMetricConfig(newQuota)); Review Comment: Ack will change this. Also, metricTags is empty. Will fix the log statement as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
abhijeetk88 commented on code in PR #15625: URL: https://github.com/apache/kafka/pull/15625#discussion_r1580515079 ## core/src/main/java/kafka/log/remote/quota/RLMQuotaManagerConfig.java: ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote.quota; + +public class RLMQuotaManagerConfig { Review Comment: Will add. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
abhijeetk88 commented on code in PR #15625: URL: https://github.com/apache/kafka/pull/15625#discussion_r1580499863 ## core/src/test/java/kafka/log/remote/quota/RLMQuotaManagerTest.java: ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote.quota; + +import kafka.server.QuotaType; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Quota; +import org.apache.kafka.common.utils.MockTime; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RLMQuotaManagerTest { +private final MockTime time = new MockTime(); +private final Metrics metrics = new Metrics(new MetricConfig(), Collections.emptyList(), time); +private static final QuotaType QUOTA_TYPE = QuotaType.RLMFetch$.MODULE$; +private static final String DESCRIPTION = "Tracking byte rate"; + +@Test +public void testQuotaExceeded() { +RLMQuotaManager quotaManager = new RLMQuotaManager( +new RLMQuotaManagerConfig(50, 11, 1), metrics, QUOTA_TYPE, DESCRIPTION, time); + +assertFalse(quotaManager.isQuotaExceeded()); +quotaManager.record(500); +// Move clock by 1 sec, quota is violated +moveClock(1); +assertTrue(quotaManager.isQuotaExceeded()); + +// Move clock by another 8 secs, quota is still violated for the window +moveClock(8); +assertTrue(quotaManager.isQuotaExceeded()); + +// Move clock by 1 sec, quota is no more violated +moveClock(1); +assertFalse(quotaManager.isQuotaExceeded()); +} + +@Test +public void testQuotaUpdate() { +RLMQuotaManager quotaManager = new RLMQuotaManager( +new RLMQuotaManagerConfig(50, 11, 1), metrics, QUOTA_TYPE, DESCRIPTION, time); + +assertFalse(quotaManager.isQuotaExceeded()); +quotaManager.record(51); +assertTrue(quotaManager.isQuotaExceeded()); + +Map fetchQuotaMetrics = metrics.metrics().entrySet().stream() +.filter(entry -> entry.getKey().name().equals("byte-rate") && entry.getKey().group().equals(QUOTA_TYPE.toString())) +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +Map nonQuotaMetrics = metrics.metrics().entrySet().stream() +.filter(entry -> !entry.getKey().name().equals("byte-rate") || !entry.getKey().group().equals(QUOTA_TYPE.toString())) +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +assertEquals(1, fetchQuotaMetrics.size()); +assertFalse(nonQuotaMetrics.isEmpty()); + +Map configForQuotaMetricsBeforeUpdate = extractMetricConfig(fetchQuotaMetrics); +Map configForNonQuotaMetricsBeforeUpdate = extractMetricConfig(nonQuotaMetrics); + +// Update quota to 60, quota is no more violated +Quota quota60Bytes = new Quota(60, true); +quotaManager.updateQuota(quota60Bytes); +assertFalse(quotaManager.isQuotaExceeded()); + +// Verify quota metrics were updated +Map configForQuotaMetricsAfterFirstUpdate = extractMetricConfig(fetchQuotaMetrics); +assertNotEquals(configForQuotaMetricsBeforeUpdate, configForQuotaMetricsAfterFirstUpdate); +fetchQuotaMetrics.values().forEach(metric -> assertEquals(metric.config().quota(), quota60Bytes)); Review Comment: Yes, thanks for pointing out. ## core/src/test/java/kafka/log/remote/quota/RLMQuotaManagerTest.java: ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership.
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
kamalcph commented on code in PR #15625: URL: https://github.com/apache/kafka/pull/15625#discussion_r1577552070 ## core/src/main/java/kafka/log/remote/quota/RLMQuotaManagerConfig.java: ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote.quota; + +public class RLMQuotaManagerConfig { Review Comment: do we need to add `toString()`? ## core/src/test/java/kafka/log/remote/quota/RLMQuotaManagerTest.java: ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote.quota; + +import kafka.server.QuotaType; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Quota; +import org.apache.kafka.common.utils.MockTime; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RLMQuotaManagerTest { +private final MockTime time = new MockTime(); +private final Metrics metrics = new Metrics(new MetricConfig(), Collections.emptyList(), time); +private static final QuotaType QUOTA_TYPE = QuotaType.RLMFetch$.MODULE$; +private static final String DESCRIPTION = "Tracking byte rate"; + +@Test +public void testQuotaExceeded() { +RLMQuotaManager quotaManager = new RLMQuotaManager( +new RLMQuotaManagerConfig(50, 11, 1), metrics, QUOTA_TYPE, DESCRIPTION, time); + +assertFalse(quotaManager.isQuotaExceeded()); +quotaManager.record(500); +// Move clock by 1 sec, quota is violated +moveClock(1); +assertTrue(quotaManager.isQuotaExceeded()); + +// Move clock by another 8 secs, quota is still violated for the window +moveClock(8); +assertTrue(quotaManager.isQuotaExceeded()); + +// Move clock by 1 sec, quota is no more violated +moveClock(1); +assertFalse(quotaManager.isQuotaExceeded()); +} + +@Test +public void testQuotaUpdate() { +RLMQuotaManager quotaManager = new RLMQuotaManager( +new RLMQuotaManagerConfig(50, 11, 1), metrics, QUOTA_TYPE, DESCRIPTION, time); + +assertFalse(quotaManager.isQuotaExceeded()); +quotaManager.record(51); +assertTrue(quotaManager.isQuotaExceeded()); + +Map fetchQuotaMetrics = metrics.metrics().entrySet().stream() +.filter(entry -> entry.getKey().name().equals("byte-rate") && entry.getKey().group().equals(QUOTA_TYPE.toString())) +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +Map nonQuotaMetrics = metrics.metrics().entrySet().stream() +.filter(entry -> !entry.getKey().name().equals("byte-rate") || !entry.getKey().group().equals(QUOTA_TYPE.toString())) +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +assertEquals(1, fetchQuotaMetrics.size()); +assertFalse(nonQuotaMetrics.isEmpty()); + +Map configForQuotaMetricsBeforeUpdate = extractMetricConfig(fetchQuo
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
abhijeetk88 commented on code in PR #15625: URL: https://github.com/apache/kafka/pull/15625#discussion_r1576007574 ## core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java: ## Review Comment: Yes, the integration of the quota manager will come in the follow-up PRs. I have mentioned it in the description of the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
abhijeetk88 commented on code in PR #15625: URL: https://github.com/apache/kafka/pull/15625#discussion_r1576005665 ## core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java: ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote.quota; + +import kafka.server.QuotaType; +import kafka.server.SensorAccess; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Quota; +import org.apache.kafka.common.metrics.QuotaViolationException; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.SimpleRate; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.runtime.BoxedUnit; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class RLMQuotaManager { +private static final Logger LOGGER = LoggerFactory.getLogger(RLMQuotaManager.class); + +private final RLMQuotaManagerConfig config; +private final Metrics metrics; +private final QuotaType quotaType; +private final String description; +private final Time time; + +private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); +private final SensorAccess sensorAccess; +private Quota quota; + +public RLMQuotaManager(RLMQuotaManagerConfig config, Metrics metrics, QuotaType quotaType, String description, Time time) { +this.config = config; +this.metrics = metrics; +this.quotaType = quotaType; +this.description = description; +this.time = time; + +this.quota = new Quota(config.getQuotaBytesPerSecond(), true); +this.sensorAccess = new SensorAccess(lock, metrics); +} + +public void updateQuota(Quota newQuota) { +lock.writeLock().lock(); +try { +this.quota = newQuota; + +Map allMetrics = metrics.metrics(); +MetricName quotaMetricName = metricName(); +allMetrics.forEach((metricName, metric) -> { +if (metricName.name().equals(quotaMetricName.name()) && metricName.group().equals(quotaMetricName.group())) { +Map metricTags = metricName.tags(); +LOGGER.info("Sensor for quota-id {} already exists. Setting quota to {} in MetricConfig", metricTags, newQuota); Review Comment: Makes sense, will change this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
funky-eyes commented on code in PR #15625: URL: https://github.com/apache/kafka/pull/15625#discussion_r1572073595 ## core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java: ## Review Comment: I have a question, in fact, what this PR does is to provide a standard configuration and generate corresponding rate limiters and related monitoring indicators, right? Then it needs to be used in the corresponding RemoteStorageManager, correct? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
HenryCaiHaiying commented on code in PR #15625: URL: https://github.com/apache/kafka/pull/15625#discussion_r1566566286 ## core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java: ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote.quota; + +import kafka.server.QuotaType; +import kafka.server.SensorAccess; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Quota; +import org.apache.kafka.common.metrics.QuotaViolationException; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.SimpleRate; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.runtime.BoxedUnit; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class RLMQuotaManager { +private static final Logger LOGGER = LoggerFactory.getLogger(RLMQuotaManager.class); + +private final RLMQuotaManagerConfig config; +private final Metrics metrics; +private final QuotaType quotaType; +private final String description; +private final Time time; + +private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); +private final SensorAccess sensorAccess; +private Quota quota; + +public RLMQuotaManager(RLMQuotaManagerConfig config, Metrics metrics, QuotaType quotaType, String description, Time time) { +this.config = config; +this.metrics = metrics; +this.quotaType = quotaType; +this.description = description; +this.time = time; + +this.quota = new Quota(config.getQuotaBytesPerSecond(), true); +this.sensorAccess = new SensorAccess(lock, metrics); +} + +public void updateQuota(Quota newQuota) { +lock.writeLock().lock(); +try { +this.quota = newQuota; + +Map allMetrics = metrics.metrics(); +MetricName quotaMetricName = metricName(); +allMetrics.forEach((metricName, metric) -> { +if (metricName.name().equals(quotaMetricName.name()) && metricName.group().equals(quotaMetricName.group())) { +Map metricTags = metricName.tags(); +LOGGER.info("Sensor for quota-id {} already exists. Setting quota to {} in MetricConfig", metricTags, newQuota); Review Comment: LOGGER.warn ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org