This is an automated email from the ASF dual-hosted git repository. cconnell pushed a commit to branch HBASE-29479/accurate-quota-cache in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 72aebe2ce1610af8c74b88938c5a041c1a18477d Author: Charles Connell <[email protected]> AuthorDate: Thu Aug 7 10:30:54 2025 -0400 Refactor default quota --- .../org/apache/hadoop/hbase/quotas/QuotaCache.java | 73 +++++++++++++++------- 1 file changed, 51 insertions(+), 22 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java index b7bf29d1f0d..08627d92526 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hbase.quotas; -import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; - import java.io.IOException; import java.time.Duration; import java.util.ArrayList; @@ -30,6 +28,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterMetrics; import org.apache.hadoop.hbase.ClusterMetrics.Option; @@ -49,7 +48,6 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader; import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache; @@ -203,11 +201,11 @@ public class QuotaCache implements Stoppable { */ public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) { String user = getQuotaUserName(ugi); - return computeIfAbsent(userQuotaCache, user, () -> { - Optional<UserQuotaState> userQuota = fetchOne(user, userQuotaStateFetcher); - return userQuota - .orElseGet(() -> QuotaUtil.buildDefaultUserQuotaState(rsServices.getConfiguration(), 0L)); - }); + if (!userQuotaCache.containsKey(user)) { + userQuotaCache.put(user, QuotaUtil.buildDefaultUserQuotaState(rsServices.getConfiguration(), 0L)); + fetch("user", userQuotaCache, userQuotaStateFetcher); + } + return userQuotaCache.get(user); } /** @@ -216,8 +214,11 @@ public class QuotaCache implements Stoppable { * @return the limiter associated to the specified table */ public QuotaLimiter getTableLimiter(final TableName table) { - return computeIfAbsent(this.tableQuotaCache, table, - () -> fetchOne(table, tableQuotaStateFetcher).orElse(new QuotaState())).getGlobalLimiter(); + if (!tableQuotaCache.containsKey(table)) { + tableQuotaCache.put(table, new QuotaState()); + fetch("table", tableQuotaCache, tableQuotaStateFetcher); + } + return tableQuotaCache.get(table).getGlobalLimiter(); } /** @@ -226,9 +227,11 @@ public class QuotaCache implements Stoppable { * @return the limiter associated to the specified namespace */ public QuotaLimiter getNamespaceLimiter(final String namespace) { - return computeIfAbsent(this.namespaceQuotaCache, namespace, - () -> fetchOne(namespace, namespaceQuotaStateFetcher).orElse(new QuotaState())) - .getGlobalLimiter(); + if (!namespaceQuotaCache.containsKey(namespace)) { + namespaceQuotaCache.put(namespace, new QuotaState()); + fetch("namespace", namespaceQuotaCache, namespaceQuotaStateFetcher); + } + return namespaceQuotaCache.get(namespace).getGlobalLimiter(); } /** @@ -237,9 +240,11 @@ public class QuotaCache implements Stoppable { * @return the limiter associated to the specified region server */ public QuotaLimiter getRegionServerQuotaLimiter(final String regionServer) { - return computeIfAbsent(this.regionServerQuotaCache, regionServer, - () -> fetchOne(regionServer, regionServerQuotaStateFetcher).orElse(new QuotaState())) - .getGlobalLimiter(); + if (!regionServerQuotaCache.containsKey(regionServer)) { + regionServerQuotaCache.put(regionServer, new QuotaState()); + fetch("regionServer", regionServerQuotaCache, regionServerQuotaStateFetcher); + } + return regionServerQuotaCache.get(regionServer).getGlobalLimiter(); } protected boolean isExceedThrottleQuotaEnabled() { @@ -265,6 +270,30 @@ public class QuotaCache implements Stoppable { } } + private <K, V extends QuotaState> void fetch(final String type, + final Map<K, V> quotasMap, final Fetcher<K, V> fetcher) { + // Find the quota entries to update + List<Get> gets = quotasMap.keySet().stream().map(fetcher::makeGet).collect(Collectors.toList()); + + // fetch and update the quota entries + if (!gets.isEmpty()) { + try { + for (Map.Entry<K, V> entry : fetcher.fetchEntries(gets).entrySet()) { + V quotaInfo = quotasMap.putIfAbsent(entry.getKey(), entry.getValue()); + if (quotaInfo != null) { + quotaInfo.update(entry.getValue()); + } + + if (LOG.isTraceEnabled()) { + LOG.trace("Loading {} key={} quotas={}", type, entry.getKey(), quotaInfo); + } + } + } catch (IOException e) { + LOG.warn("Unable to read {} from quota table", type, e); + } + } + } + /** * Applies a request attribute user override if available, otherwise returns the UGI's short * username @@ -370,10 +399,10 @@ public class QuotaCache implements Stoppable { .computeIfAbsent(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY, key -> new QuotaState()); updateQuotaFactors(); - fetch("namespace", QuotaCache.this.namespaceQuotaCache, namespaceQuotaStateFetcher); - fetch("table", QuotaCache.this.tableQuotaCache, tableQuotaStateFetcher); - fetch("user", QuotaCache.this.userQuotaCache, userQuotaStateFetcher); - fetch("regionServer", QuotaCache.this.regionServerQuotaCache, regionServerQuotaStateFetcher); + fetchAndEvict("namespace", QuotaCache.this.namespaceQuotaCache, namespaceQuotaStateFetcher); + fetchAndEvict("table", QuotaCache.this.tableQuotaCache, tableQuotaStateFetcher); + fetchAndEvict("user", QuotaCache.this.userQuotaCache, userQuotaStateFetcher); + fetchAndEvict("regionServer", QuotaCache.this.regionServerQuotaCache, regionServerQuotaStateFetcher); fetchExceedThrottleQuota(); } @@ -386,7 +415,7 @@ public class QuotaCache implements Stoppable { } } - private <K, V extends QuotaState> void fetch(final String type, + private <K, V extends QuotaState> void fetchAndEvict(final String type, final ConcurrentMap<K, V> quotasMap, final Fetcher<K, V> fetcher) { long now = EnvironmentEdgeManager.currentTime(); long evictPeriod = getPeriod() * EVICT_PERIOD_FACTOR; @@ -543,7 +572,7 @@ public class QuotaCache implements Stoppable { T get() throws Exception; } - static interface Fetcher<Key, Value> { + interface Fetcher<Key, Value> { Get makeGet(Key key); Map<Key, Value> fetchEntries(List<Get> gets) throws IOException;
