This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 95bd8a3da87 HBASE-29401 Support invalidate meta cache when tables
dropped or disabled (#7117)
95bd8a3da87 is described below
commit 95bd8a3da87e8c106e4eedec1726234681b7d859
Author: chaijunjie0101 <[email protected]>
AuthorDate: Mon Aug 25 23:12:16 2025 +0800
HBASE-29401 Support invalidate meta cache when tables dropped or disabled
(#7117)
Signed-off-by: Duo Zhang <[email protected]>
Signed-off-by: Chandra Kambham <[email protected]>
---
.../hbase/client/AsyncNonMetaRegionLocator.java | 78 ++++++++++++++-
.../hadoop/hbase/client/AsyncRegionLocator.java | 2 +-
.../client/TestAsyncNonMetaRegionLocator.java | 2 +-
...stAsyncNonMetaRegionLocatorConcurrenyLimit.java | 2 +-
.../hbase/client/TestAsyncTableLocatePrefetch.java | 3 +-
.../hbase/client/TestRegionLocationCaching.java | 107 +++++++++++++++++++++
6 files changed, 189 insertions(+), 5 deletions(-)
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
index ead8362fab1..8c23ef181bf 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
@@ -57,10 +57,15 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Scan.ReadType;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+import org.apache.hbase.thirdparty.io.netty.util.Timeout;
+import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
+
/**
* The asynchronous locator for regions other than meta.
*/
@@ -91,6 +96,8 @@ class AsyncNonMetaRegionLocator {
private final ConcurrentMap<TableName, TableCache> cache = new
ConcurrentHashMap<>();
+ private final HashedWheelTimer retryTimer;
+
private static final class LocateRequest {
private final byte[] row;
@@ -212,7 +219,7 @@ class AsyncNonMetaRegionLocator {
}
}
- AsyncNonMetaRegionLocator(AsyncConnectionImpl conn) {
+ AsyncNonMetaRegionLocator(AsyncConnectionImpl conn, HashedWheelTimer
retryTimer) {
this.conn = conn;
this.maxConcurrentLocateRequestPerTable = conn.getConfiguration().getInt(
MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE,
DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE);
@@ -253,6 +260,20 @@ class AsyncNonMetaRegionLocator {
default:
// Doing nothing
}
+
+ // The interval of invalidate meta cache task,
+ // if disable/delete table using same connection or usually create a new
connection, no need to
+ // set it.
+ // Suggest set it to 24h or a higher value, because disable/delete table
usually not very
+ // frequently.
+ this.retryTimer = retryTimer;
+ long metaCacheInvalidateInterval = conn.getConfiguration()
+ .getLong("hbase.client.connection.metacache.invalidate-interval.ms", 0L);
+ if (metaCacheInvalidateInterval > 0) {
+ TimerTask invalidateMetaCacheTask =
getInvalidateMetaCacheTask(metaCacheInvalidateInterval);
+ this.retryTimer.newTimeout(invalidateMetaCacheTask,
metaCacheInvalidateInterval,
+ TimeUnit.MILLISECONDS);
+ }
}
private TableCache getTableCache(TableName tableName) {
@@ -618,4 +639,59 @@ class AsyncNonMetaRegionLocator {
return tableCache.regionLocationCache.getAll().stream()
.mapToInt(RegionLocations::numNonNullElements).sum();
}
+
+ private TimerTask getInvalidateMetaCacheTask(long
metaCacheInvalidateInterval) {
+ return new TimerTask() {
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ FutureUtils.addListener(invalidateTableCache(), (res, err) -> {
+ if (err != null) {
+ LOG.warn("InvalidateTableCache failed.", err);
+ }
+ AsyncConnectionImpl.RETRY_TIMER.newTimeout(this,
metaCacheInvalidateInterval,
+ TimeUnit.MILLISECONDS);
+ });
+ }
+ };
+ }
+
+ private CompletableFuture<Void> invalidateTableCache() {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ Iterator<TableName> tbnIter = cache.keySet().iterator();
+ AsyncAdmin admin = conn.getAdmin();
+ invalidateCache(future, tbnIter, admin);
+ return future;
+ }
+
+ private void invalidateCache(CompletableFuture<Void> future,
Iterator<TableName> tbnIter,
+ AsyncAdmin admin) {
+ if (tbnIter.hasNext()) {
+ TableName tableName = tbnIter.next();
+ FutureUtils.addListener(admin.isTableDisabled(tableName),
(tableDisabled, err) -> {
+ boolean shouldInvalidateCache = false;
+ if (err != null) {
+ if (err instanceof TableNotFoundException) {
+ LOG.info("Table {} was not exist, will invalidate its cache.",
tableName);
+ shouldInvalidateCache = true;
+ } else {
+ // If other exception occurred, just skip to invalidate it cache.
+ LOG.warn("Get table state of {} failed, skip to invalidate its
cache.", tableName, err);
+ return;
+ }
+ } else if (tableDisabled) {
+ LOG.info("Table {} was disabled, will invalidate its cache.",
tableName);
+ shouldInvalidateCache = true;
+ }
+ if (shouldInvalidateCache) {
+ clearCache(tableName);
+ LOG.info("Invalidate cache for {} succeed.", tableName);
+ } else {
+ LOG.debug("Table {} is normal, no need to invalidate its cache.",
tableName);
+ }
+ invalidateCache(future, tbnIter, admin);
+ });
+ } else {
+ future.complete(null);
+ }
+ }
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
index da58dd8e1e5..0e872a5b21d 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
@@ -71,7 +71,7 @@ class AsyncRegionLocator {
AsyncRegionLocator(AsyncConnectionImpl conn, HashedWheelTimer retryTimer) {
this.conn = conn;
this.metaRegionLocator = new AsyncMetaRegionLocator(conn.registry);
- this.nonMetaRegionLocator = new AsyncNonMetaRegionLocator(conn);
+ this.nonMetaRegionLocator = new AsyncNonMetaRegionLocator(conn,
retryTimer);
this.retryTimer = retryTimer;
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
index 6a5230b3a12..0bfbd18eb32 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
@@ -131,7 +131,7 @@ public class TestAsyncNonMetaRegionLocator {
ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(),
User.getCurrent());
conn =
new AsyncConnectionImpl(c, registry, registry.getClusterId().get(),
null, User.getCurrent());
- locator = new AsyncNonMetaRegionLocator(conn);
+ locator = new AsyncNonMetaRegionLocator(conn,
AsyncConnectionImpl.RETRY_TIMER);
}
@After
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
index cfcec727981..4529c07dfd1 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
@@ -128,7 +128,7 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(),
User.getCurrent());
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
registry.getClusterId().get(), null, User.getCurrent());
- LOCATOR = new AsyncNonMetaRegionLocator(CONN);
+ LOCATOR = new AsyncNonMetaRegionLocator(CONN,
AsyncConnectionImpl.RETRY_TIMER);
SPLIT_KEYS = IntStream.range(1, 256).mapToObj(i ->
Bytes.toBytes(String.format("%02x", i)))
.toArray(byte[][]::new);
TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS);
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java
index 636093cd82f..a9f9a276294 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java
@@ -58,7 +58,8 @@ public class TestAsyncTableLocatePrefetch {
TEST_UTIL.createMultiRegionTable(TABLE_NAME, FAMILY);
TEST_UTIL.waitTableAvailable(TABLE_NAME);
CONN =
ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
- LOCATOR = new AsyncNonMetaRegionLocator((AsyncConnectionImpl) CONN);
+ LOCATOR =
+ new AsyncNonMetaRegionLocator((AsyncConnectionImpl) CONN,
AsyncConnectionImpl.RETRY_TIMER);
}
@AfterClass
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRegionLocationCaching.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRegionLocationCaching.java
index c949463ab59..fb1ecd50d1d 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRegionLocationCaching.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRegionLocationCaching.java
@@ -17,18 +17,23 @@
*/
package org.apache.hadoop.hbase.client;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -110,6 +115,21 @@ public class TestRegionLocationCaching {
}
}
+ /**
+ * Method to check whether the cached region location is empty for the given
table. It repeats the
+ * same check several times as clearing of cache by some async operations
may not reflect
+ * immediately.
+ */
+ private void checkRegionLocationIsNotCached(final TableName tableName, final
Connection conn)
+ throws InterruptedException {
+ for (int count = 0; count < 50; count++) {
+ int number = ((AsyncConnectionImpl)
conn.toAsyncConnection()).getLocator()
+ .getNumberOfCachedRegionLocations(tableName);
+ assertEquals("Expected zero number of cached region locations", 0,
number);
+ Thread.sleep(100);
+ }
+ }
+
/**
* Method to check whether the passed row exists in the given table
*/
@@ -129,4 +149,91 @@ public class TestRegionLocationCaching {
} while (r == null || r.getValue(family, qualifier) == null);
}
}
+
+ @Test
+ public void testInvalidateMetaCache() throws Throwable {
+ // There are 2 tables and 2 connections, both connection cached all region
locations of all
+ // tables,
+ // after disable/delete one table using one connection, need invalidate
the meta cache
+ // of the table in other connections.
+ ColumnFamilyDescriptor cfd =
+ ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("cf")).build();
+
+ TableName tbn1 = TableName.valueOf("testInvalidateMetaCache1");
+ TableDescriptor tbd1 =
TableDescriptorBuilder.newBuilder(tbn1).setColumnFamily(cfd).build();
+
+ TableName tbn2 = TableName.valueOf("testInvalidateMetaCache2");
+ TableDescriptor tbd2 =
TableDescriptorBuilder.newBuilder(tbn2).setColumnFamily(cfd).build();
+
+ Configuration conf1 =
HBaseConfiguration.create(TEST_UTIL.getConfiguration());
+ conf1.setLong("hbase.client.connection.metacache.invalidate-interval.ms",
5 * 1000);
+
+ Connection conn1 = ConnectionFactory.createConnection(conf1);
+ Connection conn2 =
ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
+
+ try {
+ Admin admin1 = conn1.getAdmin();
+ admin1.createTable(tbd1);
+ admin1.createTable(tbd2);
+ conn1.getRegionLocator(tbn1).getAllRegionLocations();
+ conn1.getRegionLocator(tbn2).getAllRegionLocations();
+ checkRegionLocationIsCached(tbn1, conn1);
+ checkRegionLocationIsCached(tbn2, conn1);
+
+ Admin admin2 = conn2.getAdmin();
+ admin2.disableTable(tbn1);
+ // Sleep 10s to test whether the invalidateMetaCache task could execute
regularly(the interval
+ // is 5s).
+ Threads.sleep(10 * 1000);
+ checkRegionLocationIsNotCached(tbn1, conn1);
+ checkRegionLocationIsCached(tbn2, conn1);
+
+ admin2.disableTable(tbn2);
+ admin2.deleteTable(tbn2);
+ Threads.sleep(10 * 1000);
+ checkRegionLocationIsNotCached(tbn1, conn1);
+ checkRegionLocationIsNotCached(tbn2, conn1);
+ } finally {
+ IOUtils.closeQuietly(conn1, conn2);
+ }
+ }
+
+ @Test
+ public void testDisableInvalidateMetaCache() throws Throwable {
+ ColumnFamilyDescriptor cfd =
+ ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("cf")).build();
+
+ TableName tbn1 = TableName.valueOf("testDisableInvalidateMetaCache1");
+ TableDescriptor tbd1 =
TableDescriptorBuilder.newBuilder(tbn1).setColumnFamily(cfd).build();
+
+ TableName tbn2 = TableName.valueOf("testDisableInvalidateMetaCache2");
+ TableDescriptor tbd2 =
TableDescriptorBuilder.newBuilder(tbn2).setColumnFamily(cfd).build();
+
+ Connection conn1 =
ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
+ Connection conn2 =
ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
+
+ try {
+ Admin admin1 = conn1.getAdmin();
+ admin1.createTable(tbd1);
+ admin1.createTable(tbd2);
+ conn1.getRegionLocator(tbn1).getAllRegionLocations();
+ conn1.getRegionLocator(tbn2).getAllRegionLocations();
+ checkRegionLocationIsCached(tbn1, conn1);
+ checkRegionLocationIsCached(tbn2, conn1);
+
+ Admin admin2 = conn2.getAdmin();
+ admin2.disableTable(tbn1);
+ Threads.sleep(10 * 1000);
+ checkRegionLocationIsCached(tbn1, conn1);
+ checkRegionLocationIsCached(tbn2, conn1);
+
+ admin2.disableTable(tbn2);
+ admin2.deleteTable(tbn2);
+ Threads.sleep(10 * 1000);
+ checkRegionLocationIsCached(tbn1, conn1);
+ checkRegionLocationIsCached(tbn2, conn1);
+ } finally {
+ IOUtils.closeQuietly(conn1, conn2);
+ }
+ }
}