This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.5 by this push:
     new a6a0a567ece HBASE-29713 resolving race condition while acquiring and 
releasing the worker for splitWalProcedure (#7465)
a6a0a567ece is described below

commit a6a0a567ece42f671a18f9c3536e55c1facd9dfc
Author: Umesh <[email protected]>
AuthorDate: Mon Nov 17 11:48:40 2025 +0530

    HBASE-29713 resolving race condition while acquiring and releasing the 
worker for splitWalProcedure (#7465)
    
    Signed-off-by: Andrew Purtell <[email protected]>
    Signed-off-by: Viraj Jasani <[email protected]>
    Signed-off-by: Aman Poonia <[email protected]>
---
 .../hadoop/hbase/master/SplitWALManager.java       |  4 +-
 .../hadoop/hbase/master/TestSplitWALManager.java   | 97 ++++++++++++++++++++++
 2 files changed, 99 insertions(+), 2 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java
index a474b566fdf..a3b96c6f80b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java
@@ -225,12 +225,12 @@ public class SplitWALManager {
       currentWorkers.compute(serverName, (k, v) -> v == null ? null : v + 1);
     }
 
-    public void suspend(Procedure<?> proc) {
+    public synchronized void suspend(Procedure<?> proc) {
       event.suspend();
       event.suspendIfNotReady(proc);
     }
 
-    public void wake(MasterProcedureScheduler scheduler) {
+    public synchronized void wake(MasterProcedureScheduler scheduler) {
       if (!event.isReady()) {
         event.wake(scheduler);
       }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java
index 8818609c731..20d871959e4 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java
@@ -302,6 +302,103 @@ public class TestSplitWALManager {
     failedProcedure.countDown();
   }
 
+  /**
+   * Test the race condition between suspend() and wake() in 
SplitWorkerAssigner. This test
+   * reproduces the issue where a procedure can be lost if wake() is called 
between event.suspend()
+   * and event.suspendIfNotReady() in the suspend() method.
+   * <p>
+   * The race condition happens when: 1. Thread-1: calls suspend() which sets 
event.ready=false 2.
+   * Thread-2: calls wake() which sees ready=false and marks event.ready=true 
3. Thread-1: calls
+   * suspendIfNotReady() which sees ready=true and doesn't add procedure 4. 
Result: Procedure is in
+   * WAITING state but not in suspended queue, never woken up
+   */
+  @Test
+  public void testSuspendWakeRaceCondition() throws Exception {
+    final int NUM_ITERATIONS = 50; // Run multiple times to increase chance of 
race
+    final int NUM_PROCEDURES = 10;
+
+    for (int iteration = 0; iteration < NUM_ITERATIONS; iteration++) {
+      List<FakeServerProcedure> testProcedures = new ArrayList<>();
+
+      // Fill all worker slots (3 servers * 1 max splitter = 3 workers)
+      for (int i = 0; i < 3; i++) {
+        FakeServerProcedure procedure =
+          new 
FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(i).getServerName());
+        testProcedures.add(procedure);
+        
ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), 
procedure,
+          HConstants.NO_NONCE, HConstants.NO_NONCE);
+      }
+
+      // Wait for all workers to be acquired
+      TEST_UTIL.waitFor(10000, () -> testProcedures.get(2).isWorkerAcquired());
+
+      // Create additional procedures that will need to suspend
+      List<FakeServerProcedure> suspendedProcedures = new ArrayList<>();
+      for (int i = 0; i < NUM_PROCEDURES; i++) {
+        FakeServerProcedure procedure =
+          new 
FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta());
+        suspendedProcedures.add(procedure);
+      }
+
+      // Submit all suspended procedures in parallel to create contention
+      CountDownLatch startLatch = new CountDownLatch(1);
+      List<Thread> submitterThreads = new ArrayList<>();
+
+      for (FakeServerProcedure proc : suspendedProcedures) {
+        Thread t = new Thread(() -> {
+          try {
+            startLatch.await();
+            
ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), 
proc,
+              HConstants.NO_NONCE, HConstants.NO_NONCE);
+          } catch (Exception e) {
+            LOG.error("Failed to submit procedure", e);
+          }
+        });
+        t.start();
+        submitterThreads.add(t);
+      }
+
+      // Start all submissions at once
+      startLatch.countDown();
+
+      // Simultaneously release workers to create race between suspend and wake
+      Thread releaseThread = new Thread(() -> {
+        try {
+          Thread.sleep(10); // Small delay to ensure some procedures are 
suspending
+          for (FakeServerProcedure proc : testProcedures) {
+            proc.countDown();
+            Thread.sleep(1); // Stagger releases slightly
+          }
+          for (FakeServerProcedure proc : suspendedProcedures) {
+            proc.countDown();
+            Thread.sleep(1); // Stagger releases slightly
+          }
+        } catch (Exception e) {
+          LOG.error("Failed to release workers", e);
+        }
+      });
+      releaseThread.start();
+
+      // Wait for all threads to finish
+      for (Thread t : submitterThreads) {
+        t.join(5000);
+      }
+      releaseThread.join(5000);
+
+      // All suspended procedures should eventually acquire workers and 
complete
+      // This will fail if the race condition causes a procedure to be lost
+      for (FakeServerProcedure proc : suspendedProcedures) {
+        TEST_UTIL.waitFor(30000, 1000, proc::isWorkerAcquired);
+        TEST_UTIL.waitFor(10000, proc::isSuccess);
+      }
+
+      // Also check for the initial 3 procedures to complete
+      for (FakeServerProcedure proc : testProcedures) {
+        TEST_UTIL.waitFor(10000, proc::isSuccess);
+      }
+    }
+  }
+
   public static final class FakeServerProcedure
     extends StateMachineProcedure<MasterProcedureEnv, 
MasterProcedureProtos.SplitWALState>
     implements ServerProcedureInterface {

Reply via email to