This is an automated email from the ASF dual-hosted git repository.
stoty pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-3 by this push:
new f446a96b9e7 HBASE-29647 Restore preWALRestore and postWALRestore
coprocessor hooks (#7369)
f446a96b9e7 is described below
commit f446a96b9e73ebf54e95dde391dfce7480d76131
Author: Istvan Toth <[email protected]>
AuthorDate: Thu Oct 9 06:18:18 2025 +0200
HBASE-29647 Restore preWALRestore and postWALRestore coprocessor hooks
(#7369)
Signed-off-by: Duo Zhang <[email protected]>
---
.../hadoop/hbase/coprocessor/RegionObserver.java | 16 ++++++++
.../apache/hadoop/hbase/regionserver/HRegion.java | 13 +++++++
.../hadoop/hbase/regionserver/RSRpcServices.java | 20 ++++++++++
.../hbase/regionserver/RegionCoprocessorHost.java | 25 ++++++++++++
.../coprocessor/SampleRegionWALCoprocessor.java | 32 +++++++++++++++
.../hbase/coprocessor/SimpleRegionObserver.java | 36 +++++++++++++++++
.../coprocessor/TestRegionObserverInterface.java | 45 ++++++++++++++++++++--
.../hadoop/hbase/coprocessor/TestWALObserver.java | 2 +
8 files changed, 185 insertions(+), 4 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
index 65fe524d0a4..7b7f7e208f5 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
@@ -1412,6 +1412,22 @@ public interface RegionObserver {
RegionInfo info, Path edits) throws IOException {
}
+ /**
+ * Called before a {@link WALEdit} replayed for this region.
+ * @param ctx the environment provided by the region server
+ */
+ default void preWALRestore(ObserverContext<? extends
RegionCoprocessorEnvironment> ctx,
+ RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
+ }
+
+ /**
+ * Called after a {@link WALEdit} replayed for this region.
+ * @param ctx the environment provided by the region server
+ */
+ default void postWALRestore(ObserverContext<? extends
RegionCoprocessorEnvironment> ctx,
+ RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
+ }
+
/**
* Called before bulkLoadHFile. Users can create a StoreFile instance to
access the contents of a
* HFile.
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 7936197ff8d..9b7daee0f66 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -5752,6 +5752,15 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver, Regi
currentReplaySeqId =
(key.getOrigLogSeqNum() > 0) ? key.getOrigLogSeqNum() :
currentEditSeqId;
+ // Start coprocessor replay here. The coprocessor is for each WALEdit
+ // instead of a KeyValue.
+ if (coprocessorHost != null) {
+ status.setStatus("Running pre-WAL-restore hook in coprocessors");
+ if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val))
{
+ // if bypass this wal entry, ignore it ...
+ continue;
+ }
+ }
boolean checkRowWithinBoundary = false;
// Check this edit is for this region.
if (
@@ -5822,6 +5831,10 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver, Regi
internalFlushcache(null, currentEditSeqId, stores.values(),
status, false,
FlushLifeCycleTracker.DUMMY);
}
+
+ if (coprocessorHost != null) {
+ coprocessorHost.postWALRestore(this.getRegionInfo(), key, val);
+ }
}
if (coprocessorHost != null) {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 11d5917dda6..fdfea375e09 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -2114,6 +2114,7 @@ public class RSRpcServices extends
HBaseRpcServicesBase<HRegionServer>
ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())
? region.getCoprocessorHost()
: null; // do not invoke coprocessors if this is a secondary region
replica
+ List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<>();
// Skip adding the edits to WAL if this is a secondary region replica
boolean isPrimary =
RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
@@ -2135,6 +2136,18 @@ public class RSRpcServices extends
HBaseRpcServicesBase<HRegionServer>
Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
new Pair<>();
List<MutationReplay> edits =
WALSplitUtil.getMutationsFromWALEntry(entry, cells, walEntry,
durability);
+ if (coprocessorHost != null) {
+ // Start coprocessor replay here. The coprocessor is for each
WALEdit instead of a
+ // KeyValue.
+ if (
+ coprocessorHost.preWALRestore(region.getRegionInfo(),
walEntry.getFirst(),
+ walEntry.getSecond())
+ ) {
+ // if bypass this log entry, ignore it ...
+ continue;
+ }
+ walEntries.add(walEntry);
+ }
if (edits != null && !edits.isEmpty()) {
// HBASE-17924
// sort to improve lock efficiency
@@ -2157,6 +2170,13 @@ public class RSRpcServices extends
HBaseRpcServicesBase<HRegionServer>
if (wal != null) {
wal.sync();
}
+
+ if (coprocessorHost != null) {
+ for (Pair<WALKey, WALEdit> entry : walEntries) {
+ coprocessorHost.postWALRestore(region.getRegionInfo(),
entry.getFirst(),
+ entry.getSecond());
+ }
+ }
return ReplicateWALEntryResponse.newBuilder().build();
} catch (IOException ie) {
throw new ServiceException(ie);
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index 703f06141bf..b300496e1d7 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -1426,6 +1426,31 @@ public class RegionCoprocessorHost
});
}
+ /**
+ * Supports Coprocessor 'bypass'.
+ * @return true if default behavior should be bypassed, false otherwise
+ */
+ public boolean preWALRestore(final RegionInfo info, final WALKey logKey,
final WALEdit logEdit)
+ throws IOException {
+ return execOperation(
+ coprocEnvironments.isEmpty() ? null : new
RegionObserverOperationWithoutResult(true) {
+ @Override
+ public void call(RegionObserver observer) throws IOException {
+ observer.preWALRestore(this, info, logKey, logEdit);
+ }
+ });
+ }
+
+ public void postWALRestore(final RegionInfo info, final WALKey logKey, final
WALEdit logEdit)
+ throws IOException {
+ execOperation(coprocEnvironments.isEmpty() ? null : new
RegionObserverOperationWithoutResult() {
+ @Override
+ public void call(RegionObserver observer) throws IOException {
+ observer.postWALRestore(this, info, logKey, logEdit);
+ }
+ });
+ }
+
/**
* @param familyPaths pairs of { CF, file path } submitted for bulk load
*/
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALCoprocessor.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALCoprocessor.java
index 17ab26c6a58..8d6d363daa6 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALCoprocessor.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALCoprocessor.java
@@ -53,6 +53,8 @@ public class SampleRegionWALCoprocessor
private boolean preWALWriteCalled = false;
private boolean postWALWriteCalled = false;
+ private boolean preWALRestoreCalled = false;
+ private boolean postWALRestoreCalled = false;
private boolean preWALRollCalled = false;
private boolean postWALRollCalled = false;
private boolean preReplayWALsCalled = false;
@@ -74,6 +76,8 @@ public class SampleRegionWALCoprocessor
this.changedQualifier = chq;
preWALWriteCalled = false;
postWALWriteCalled = false;
+ preWALRestoreCalled = false;
+ postWALRestoreCalled = false;
preWALRollCalled = false;
postWALRollCalled = false;
}
@@ -130,6 +134,15 @@ public class SampleRegionWALCoprocessor
}
}
+ /**
+ * Triggered before {@link org.apache.hadoop.hbase.regionserver.HRegion}
when WAL is Restoreed.
+ */
+ @Override
+ public void preWALRestore(ObserverContext<? extends
RegionCoprocessorEnvironment> env,
+ RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
+ preWALRestoreCalled = true;
+ }
+
@Override
public void preWALRoll(ObserverContext<? extends WALCoprocessorEnvironment>
ctx, Path oldPath,
Path newPath) throws IOException {
@@ -142,6 +155,15 @@ public class SampleRegionWALCoprocessor
postWALRollCalled = true;
}
+ /**
+ * Triggered after {@link org.apache.hadoop.hbase.regionserver.HRegion} when
WAL is Restoreed.
+ */
+ @Override
+ public void postWALRestore(ObserverContext<? extends
RegionCoprocessorEnvironment> env,
+ RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
+ postWALRestoreCalled = true;
+ }
+
@Override
public void preReplayWALs(ObserverContext<? extends
RegionCoprocessorEnvironment> ctx,
RegionInfo info, Path edits) throws IOException {
@@ -162,6 +184,16 @@ public class SampleRegionWALCoprocessor
return postWALWriteCalled;
}
+ public boolean isPreWALRestoreCalled() {
+ LOG.debug(SampleRegionWALCoprocessor.class.getName() +
".isPreWALRestoreCalled is called.");
+ return preWALRestoreCalled;
+ }
+
+ public boolean isPostWALRestoreCalled() {
+ LOG.debug(SampleRegionWALCoprocessor.class.getName() +
".isPostWALRestoreCalled is called.");
+ return postWALRestoreCalled;
+ }
+
public boolean isPreWALRollCalled() {
return preWALRollCalled;
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
index 42ddf84b877..ec32a1d2c4d 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
@@ -128,6 +128,8 @@ public class SimpleRegionObserver implements
RegionCoprocessor, RegionObserver {
final AtomicInteger ctPostBatchMutate = new AtomicInteger(0);
final AtomicInteger ctPreReplayWALs = new AtomicInteger(0);
final AtomicInteger ctPostReplayWALs = new AtomicInteger(0);
+ final AtomicInteger ctPreWALRestore = new AtomicInteger(0);
+ final AtomicInteger ctPostWALRestore = new AtomicInteger(0);
final AtomicInteger ctPreStoreFileReaderOpen = new AtomicInteger(0);
final AtomicInteger ctPostStoreFileReaderOpen = new AtomicInteger(0);
final AtomicInteger ctPostBatchMutateIndispensably = new AtomicInteger(0);
@@ -696,6 +698,24 @@ public class SimpleRegionObserver implements
RegionCoprocessor, RegionObserver {
ctPostReplayWALs.incrementAndGet();
}
+ @Override
+ public void preWALRestore(ObserverContext<? extends
RegionCoprocessorEnvironment> env,
+ RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
+ String tableName = logKey.getTableName().getNameAsString();
+ if (tableName.equals(TABLE_SKIPPED)) {
+ // skip recovery of TABLE_SKIPPED for testing purpose
+ env.bypass();
+ return;
+ }
+ ctPreWALRestore.incrementAndGet();
+ }
+
+ @Override
+ public void postWALRestore(ObserverContext<? extends
RegionCoprocessorEnvironment> env,
+ RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
+ ctPostWALRestore.incrementAndGet();
+ }
+
@Override
public StoreFileReader preStoreFileReaderOpen(
ObserverContext<? extends RegionCoprocessorEnvironment> ctx, FileSystem
fs, Path p,
@@ -912,6 +932,14 @@ public class SimpleRegionObserver implements
RegionCoprocessor, RegionObserver {
return ctPostReplayWALs.get() > 0;
}
+ public boolean hadPreWALRestore() {
+ return ctPreWALRestore.get() > 0;
+ }
+
+ public boolean hadPostWALRestore() {
+ return ctPostWALRestore.get() > 0;
+ }
+
public boolean wasScannerNextCalled() {
return ctPreScannerNext.get() > 0 && ctPostScannerNext.get() > 0;
}
@@ -1024,6 +1052,14 @@ public class SimpleRegionObserver implements
RegionCoprocessor, RegionObserver {
return ctPostReplayWALs.get();
}
+ public int getCtPreWALRestore() {
+ return ctPreWALRestore.get();
+ }
+
+ public int getCtPostWALRestore() {
+ return ctPostWALRestore.get();
+ }
+
public int getCtPreWALAppend() {
return ctPreWALAppend.get();
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
index 81b51659571..df57add1708 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
@@ -768,8 +768,9 @@ public class TestRegionObserverInterface {
tableName, new Boolean[] { false, false, true, true, true, true, false
});
verifyMethodResult(SimpleRegionObserver.class,
- new String[] { "getCtPreReplayWALs", "getCtPostReplayWALs",
"getCtPrePut", "getCtPostPut" },
- tableName, new Integer[] { 0, 0, 2, 2 });
+ new String[] { "getCtPreReplayWALs", "getCtPostReplayWALs",
"getCtPreWALRestore",
+ "getCtPostWALRestore", "getCtPrePut", "getCtPostPut" },
+ tableName, new Integer[] { 0, 0, 0, 0, 2, 2 });
cluster.killRegionServer(rs1.getRegionServer().getServerName());
Threads.sleep(1000); // Let the kill soak in.
@@ -777,14 +778,50 @@ public class TestRegionObserverInterface {
LOG.info("All regions assigned");
verifyMethodResult(SimpleRegionObserver.class,
- new String[] { "getCtPreReplayWALs", "getCtPostReplayWALs",
"getCtPrePut", "getCtPostPut" },
- tableName, new Integer[] { 1, 1, 0, 0 });
+ new String[] { "getCtPreReplayWALs", "getCtPostReplayWALs",
"getCtPreWALRestore",
+ "getCtPostWALRestore", "getCtPrePut", "getCtPostPut" },
+ tableName, new Integer[] { 1, 1, 2, 2, 0, 0 });
} finally {
util.deleteTable(tableName);
table.close();
}
}
+ @Test
+ public void testPreWALRestoreSkip() throws Exception {
+ LOG.info(TestRegionObserverInterface.class.getName() + "." +
name.getMethodName());
+ TableName tableName =
TableName.valueOf(SimpleRegionObserver.TABLE_SKIPPED);
+ Table table = util.createTable(tableName, new byte[][] { A, B, C });
+
+ try (RegionLocator locator =
util.getConnection().getRegionLocator(tableName)) {
+ JVMClusterUtil.RegionServerThread rs1 = cluster.startRegionServer();
+ ServerName sn2 = rs1.getRegionServer().getServerName();
+ String regEN =
locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
+
+ util.getAdmin().move(Bytes.toBytes(regEN), sn2);
+ while
(!sn2.equals(locator.getAllRegionLocations().get(0).getServerName())) {
+ Thread.sleep(100);
+ }
+
+ Put put = new Put(ROW);
+ put.addColumn(A, A, A);
+ put.addColumn(B, B, B);
+ put.addColumn(C, C, C);
+ table.put(put);
+
+ cluster.killRegionServer(rs1.getRegionServer().getServerName());
+ Threads.sleep(20000); // just to be sure that the kill has fully started.
+ util.waitUntilAllRegionsAssigned(tableName);
+ }
+
+ verifyMethodResult(SimpleRegionObserver.class,
+ new String[] { "getCtPreWALRestore", "getCtPostWALRestore", }, tableName,
+ new Integer[] { 0, 0 });
+
+ util.deleteTable(tableName);
+ table.close();
+ }
+
// called from testPreWALAppendIsWrittenToWAL
private void testPreWALAppendHook(Table table, TableName tableName) throws
IOException {
int expectedCalls = 0;
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
index 989110e41d9..6e7c6ff400a 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
@@ -357,6 +357,8 @@ public class TestWALObserver {
SampleRegionWALCoprocessor cp2 =
region.getCoprocessorHost().findCoprocessor(SampleRegionWALCoprocessor.class);
assertNotNull(cp2);
+ assertTrue(cp2.isPreWALRestoreCalled());
+ assertTrue(cp2.isPostWALRestoreCalled());
assertTrue(cp2.isPreReplayWALsCalled());
assertTrue(cp2.isPostReplayWALsCalled());
region.close();