This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.5 by this push:
new a6a0a567ece HBASE-29713 resolving race condition while acquiring and
releasing the worker for splitWalProcedure (#7465)
a6a0a567ece is described below
commit a6a0a567ece42f671a18f9c3536e55c1facd9dfc
Author: Umesh <[email protected]>
AuthorDate: Mon Nov 17 11:48:40 2025 +0530
HBASE-29713 resolving race condition while acquiring and releasing the
worker for splitWalProcedure (#7465)
Signed-off-by: Andrew Purtell <[email protected]>
Signed-off-by: Viraj Jasani <[email protected]>
Signed-off-by: Aman Poonia <[email protected]>
---
.../hadoop/hbase/master/SplitWALManager.java | 4 +-
.../hadoop/hbase/master/TestSplitWALManager.java | 97 ++++++++++++++++++++++
2 files changed, 99 insertions(+), 2 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java
index a474b566fdf..a3b96c6f80b 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java
@@ -225,12 +225,12 @@ public class SplitWALManager {
currentWorkers.compute(serverName, (k, v) -> v == null ? null : v + 1);
}
- public void suspend(Procedure<?> proc) {
+ public synchronized void suspend(Procedure<?> proc) {
event.suspend();
event.suspendIfNotReady(proc);
}
- public void wake(MasterProcedureScheduler scheduler) {
+ public synchronized void wake(MasterProcedureScheduler scheduler) {
if (!event.isReady()) {
event.wake(scheduler);
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java
index 8818609c731..20d871959e4 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java
@@ -302,6 +302,103 @@ public class TestSplitWALManager {
failedProcedure.countDown();
}
+ /**
+ * Test the race condition between suspend() and wake() in
SplitWorkerAssigner. This test
+ * reproduces the issue where a procedure can be lost if wake() is called
between event.suspend()
+ * and event.suspendIfNotReady() in the suspend() method.
+ * <p>
+ * The race condition happens when: 1. Thread-1: calls suspend() which sets
event.ready=false 2.
+ * Thread-2: calls wake() which sees ready=false and marks event.ready=true
3. Thread-1: calls
+ * suspendIfNotReady() which sees ready=true and doesn't add procedure 4.
Result: Procedure is in
+ * WAITING state but not in suspended queue, never woken up
+ */
+ @Test
+ public void testSuspendWakeRaceCondition() throws Exception {
+ final int NUM_ITERATIONS = 50; // Run multiple times to increase chance of
race
+ final int NUM_PROCEDURES = 10;
+
+ for (int iteration = 0; iteration < NUM_ITERATIONS; iteration++) {
+ List<FakeServerProcedure> testProcedures = new ArrayList<>();
+
+ // Fill all worker slots (3 servers * 1 max splitter = 3 workers)
+ for (int i = 0; i < 3; i++) {
+ FakeServerProcedure procedure =
+ new
FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(i).getServerName());
+ testProcedures.add(procedure);
+
ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(),
procedure,
+ HConstants.NO_NONCE, HConstants.NO_NONCE);
+ }
+
+ // Wait for all workers to be acquired
+ TEST_UTIL.waitFor(10000, () -> testProcedures.get(2).isWorkerAcquired());
+
+ // Create additional procedures that will need to suspend
+ List<FakeServerProcedure> suspendedProcedures = new ArrayList<>();
+ for (int i = 0; i < NUM_PROCEDURES; i++) {
+ FakeServerProcedure procedure =
+ new
FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta());
+ suspendedProcedures.add(procedure);
+ }
+
+ // Submit all suspended procedures in parallel to create contention
+ CountDownLatch startLatch = new CountDownLatch(1);
+ List<Thread> submitterThreads = new ArrayList<>();
+
+ for (FakeServerProcedure proc : suspendedProcedures) {
+ Thread t = new Thread(() -> {
+ try {
+ startLatch.await();
+
ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(),
proc,
+ HConstants.NO_NONCE, HConstants.NO_NONCE);
+ } catch (Exception e) {
+ LOG.error("Failed to submit procedure", e);
+ }
+ });
+ t.start();
+ submitterThreads.add(t);
+ }
+
+ // Start all submissions at once
+ startLatch.countDown();
+
+ // Simultaneously release workers to create race between suspend and wake
+ Thread releaseThread = new Thread(() -> {
+ try {
+ Thread.sleep(10); // Small delay to ensure some procedures are
suspending
+ for (FakeServerProcedure proc : testProcedures) {
+ proc.countDown();
+ Thread.sleep(1); // Stagger releases slightly
+ }
+ for (FakeServerProcedure proc : suspendedProcedures) {
+ proc.countDown();
+ Thread.sleep(1); // Stagger releases slightly
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to release workers", e);
+ }
+ });
+ releaseThread.start();
+
+ // Wait for all threads to finish
+ for (Thread t : submitterThreads) {
+ t.join(5000);
+ }
+ releaseThread.join(5000);
+
+ // All suspended procedures should eventually acquire workers and
complete
+ // This will fail if the race condition causes a procedure to be lost
+ for (FakeServerProcedure proc : suspendedProcedures) {
+ TEST_UTIL.waitFor(30000, 1000, proc::isWorkerAcquired);
+ TEST_UTIL.waitFor(10000, proc::isSuccess);
+ }
+
+ // Also check for the initial 3 procedures to complete
+ for (FakeServerProcedure proc : testProcedures) {
+ TEST_UTIL.waitFor(10000, proc::isSuccess);
+ }
+ }
+ }
+
public static final class FakeServerProcedure
extends StateMachineProcedure<MasterProcedureEnv,
MasterProcedureProtos.SplitWALState>
implements ServerProcedureInterface {