This is an automated email from the ASF dual-hosted git repository.
cconnell pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.6 by this push:
new 2ab16b8b52d HBASE-29677: Thread safety in QuotaRefresherChore (#7401)
2ab16b8b52d is described below
commit 2ab16b8b52daa511bb60db4216e454e17b29b535
Author: Charles Connell <[email protected]>
AuthorDate: Wed Oct 22 10:00:26 2025 -0400
HBASE-29677: Thread safety in QuotaRefresherChore (#7401)
Signed-off by: Ray Mattingly <[email protected]>
---
.../org/apache/hadoop/hbase/quotas/QuotaCache.java | 79 +++++++++++-----------
.../hadoop/hbase/quotas/TestQuotaCache2.java | 46 +++++++++++++
2 files changed, 87 insertions(+), 38 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java
index c95578dc5d0..34104752e81 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.quotas;
import java.io.IOException;
import java.time.Duration;
import java.util.EnumSet;
-import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
@@ -70,10 +69,10 @@ public class QuotaCache implements Stoppable {
private final Object initializerLock = new Object();
private volatile boolean initialized = false;
- private volatile Map<String, QuotaState> namespaceQuotaCache = new
HashMap<>();
- private volatile Map<TableName, QuotaState> tableQuotaCache = new
HashMap<>();
- private volatile Map<String, UserQuotaState> userQuotaCache = new
HashMap<>();
- private volatile Map<String, QuotaState> regionServerQuotaCache = new
HashMap<>();
+ private volatile Map<String, QuotaState> namespaceQuotaCache = new
ConcurrentHashMap<>();
+ private volatile Map<TableName, QuotaState> tableQuotaCache = new
ConcurrentHashMap<>();
+ private volatile Map<String, UserQuotaState> userQuotaCache = new
ConcurrentHashMap<>();
+ private volatile Map<String, QuotaState> regionServerQuotaCache = new
ConcurrentHashMap<>();
private volatile boolean exceedThrottleQuotaEnabled = false;
// factors used to divide cluster scope quota into machine scope quota
@@ -310,44 +309,48 @@ public class QuotaCache implements Stoppable {
@Override
protected void chore() {
- updateQuotaFactors();
+ synchronized (this) {
+ LOG.info("Reloading quota cache from hbase:quota table");
+ updateQuotaFactors();
+
+ try {
+ Map<String, UserQuotaState> newUserQuotaCache =
+ new ConcurrentHashMap<>(fetchUserQuotaStateEntries());
+ updateNewCacheFromOld(userQuotaCache, newUserQuotaCache);
+ userQuotaCache = newUserQuotaCache;
+ } catch (IOException e) {
+ LOG.error("Error while fetching user quotas", e);
+ }
- try {
- Map<String, UserQuotaState> newUserQuotaCache = new
HashMap<>(fetchUserQuotaStateEntries());
- updateNewCacheFromOld(userQuotaCache, newUserQuotaCache);
- userQuotaCache = newUserQuotaCache;
- } catch (IOException e) {
- LOG.error("Error while fetching user quotas", e);
- }
+ try {
+ Map<String, QuotaState> newRegionServerQuotaCache =
+ new ConcurrentHashMap<>(fetchRegionServerQuotaStateEntries());
+ updateNewCacheFromOld(regionServerQuotaCache,
newRegionServerQuotaCache);
+ regionServerQuotaCache = newRegionServerQuotaCache;
+ } catch (IOException e) {
+ LOG.error("Error while fetching region server quotas", e);
+ }
- try {
- Map<String, QuotaState> newRegionServerQuotaCache =
- new HashMap<>(fetchRegionServerQuotaStateEntries());
- updateNewCacheFromOld(regionServerQuotaCache,
newRegionServerQuotaCache);
- regionServerQuotaCache = newRegionServerQuotaCache;
- } catch (IOException e) {
- LOG.error("Error while fetching region server quotas", e);
- }
+ try {
+ Map<TableName, QuotaState> newTableQuotaCache =
+ new ConcurrentHashMap<>(fetchTableQuotaStateEntries());
+ updateNewCacheFromOld(tableQuotaCache, newTableQuotaCache);
+ tableQuotaCache = newTableQuotaCache;
+ } catch (IOException e) {
+ LOG.error("Error while refreshing table quotas", e);
+ }
- try {
- Map<TableName, QuotaState> newTableQuotaCache =
- new HashMap<>(fetchTableQuotaStateEntries());
- updateNewCacheFromOld(tableQuotaCache, newTableQuotaCache);
- tableQuotaCache = newTableQuotaCache;
- } catch (IOException e) {
- LOG.error("Error while refreshing table quotas", e);
- }
+ try {
+ Map<String, QuotaState> newNamespaceQuotaCache =
+ new ConcurrentHashMap<>(fetchNamespaceQuotaStateEntries());
+ updateNewCacheFromOld(namespaceQuotaCache, newNamespaceQuotaCache);
+ namespaceQuotaCache = newNamespaceQuotaCache;
+ } catch (IOException e) {
+ LOG.error("Error while refreshing namespace quotas", e);
+ }
- try {
- Map<String, QuotaState> newNamespaceQuotaCache =
- new HashMap<>(fetchNamespaceQuotaStateEntries());
- updateNewCacheFromOld(namespaceQuotaCache, newNamespaceQuotaCache);
- namespaceQuotaCache = newNamespaceQuotaCache;
- } catch (IOException e) {
- LOG.error("Error while refreshing namespace quotas", e);
+ fetchExceedThrottleQuota();
}
-
- fetchExceedThrottleQuota();
}
private void fetchExceedThrottleQuota() {
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaCache2.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaCache2.java
index 8f8ac4991ca..3e829b5c08a 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaCache2.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaCache2.java
@@ -131,4 +131,50 @@ public class TestQuotaCache2 {
assertTrue(newCache.containsKey("my_table2"));
assertFalse(newCache.containsKey("my_table1"));
}
+
+ @Test
+ public void testLearnsNewQuota() {
+ Map<String, QuotaState> oldCache = new HashMap<>();
+
+ QuotaState newState = new QuotaState();
+ Map<String, QuotaState> newCache = new HashMap<>();
+ newCache.put("my_table1", newState);
+
+ QuotaCache.updateNewCacheFromOld(oldCache, newCache);
+
+ assertTrue(newCache.containsKey("my_table1"));
+ }
+
+ @Test
+ public void testUserSpecificOverridesDefaultNewQuota() {
+ // establish old cache with a limiter for 100 read bytes per second
+ QuotaState oldState = new QuotaState();
+ Map<String, QuotaState> oldCache = new HashMap<>();
+ oldCache.put("my_table", oldState);
+ QuotaProtos.Throttle throttle1 = QuotaProtos.Throttle.newBuilder()
+
.setReadSize(QuotaProtos.TimedQuota.newBuilder().setTimeUnit(HBaseProtos.TimeUnit.SECONDS)
+ .setSoftLimit(100).setScope(QuotaProtos.QuotaScope.MACHINE).build())
+ .build();
+ QuotaLimiter limiter1 = TimeBasedLimiter.fromThrottle(conf, throttle1);
+ oldState.setGlobalLimiter(limiter1);
+
+ // establish new cache, with a limiter for 999 read bytes per second
+ QuotaState newState = new QuotaState();
+ Map<String, QuotaState> newCache = new HashMap<>();
+ newCache.put("my_table", newState);
+ QuotaProtos.Throttle throttle2 = QuotaProtos.Throttle.newBuilder()
+
.setReadSize(QuotaProtos.TimedQuota.newBuilder().setTimeUnit(HBaseProtos.TimeUnit.SECONDS)
+ .setSoftLimit(999).setScope(QuotaProtos.QuotaScope.MACHINE).build())
+ .build();
+ QuotaLimiter limiter2 = TimeBasedLimiter.fromThrottle(conf, throttle2);
+ newState.setGlobalLimiter(limiter2);
+
+ // update new cache from old cache
+ QuotaCache.updateNewCacheFromOld(oldCache, newCache);
+
+ // verify that the 999 available bytes from the limiter was carried over
+ TimeBasedLimiter updatedLimiter =
+ (TimeBasedLimiter) newCache.get("my_table").getGlobalLimiter();
+ assertEquals(999, updatedLimiter.getReadAvailable());
+ }
}