This is an automated email from the ASF dual-hosted git repository.

lizhimins pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 88709c56aa [ISSUE #10543] resolve data race in ServiceThread wakeup 
mechanism (#10544)
88709c56aa is described below

commit 88709c56aacae16634e7dc8a6a5f06e4a93b678d
Author: SSpirits <[email protected]>
AuthorDate: Tue Jun 30 10:26:23 2026 +0800

    [ISSUE #10543] resolve data race in ServiceThread wakeup mechanism (#10544)
---
 .../org/apache/rocketmq/common/ServiceThread.java  |  45 ++--
 .../apache/rocketmq/common/ServiceThreadTest.java  | 237 ++++++++++++++++++---
 .../java/org/apache/rocketmq/store/CommitLog.java  |   4 +-
 3 files changed, 240 insertions(+), 46 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/ServiceThread.java 
b/common/src/main/java/org/apache/rocketmq/common/ServiceThread.java
index cec00bab02..52b95b6855 100644
--- a/common/src/main/java/org/apache/rocketmq/common/ServiceThread.java
+++ b/common/src/main/java/org/apache/rocketmq/common/ServiceThread.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.common;
 
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.LockSupport;
 
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -28,9 +29,8 @@ public abstract class ServiceThread implements Runnable {
 
     private static final long JOIN_TIME = 90 * 1000;
 
-    protected Thread thread;
-    protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
-    protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);
+    protected volatile Thread thread;
+    protected final AtomicBoolean hasNotified = new AtomicBoolean(false);
     protected volatile boolean stopped = false;
     protected boolean isDaemon = false;
 
@@ -97,32 +97,49 @@ public abstract class ServiceThread implements Runnable {
             return;
         }
         this.stopped = true;
+        // wake up the parked worker so it observes the stop flag promptly
+        wakeup();
         log.info("makestop thread[{}] ", this.getServiceName());
     }
 
     public void wakeup() {
+        if (hasNotified.get()) {
+            return;
+        }
         if (hasNotified.compareAndSet(false, true)) {
-            waitPoint.countDown(); // notify
+            LockSupport.unpark(this.thread); // notify
         }
     }
 
     protected void waitForRunning(long interval) {
+        // Publish the parking thread so wakeup() can target it (also handles 
restart).
+        this.thread = Thread.currentThread();
+
         if (hasNotified.compareAndSet(true, false)) {
             this.onWaitEnd();
             return;
         }
 
-        //entry to wait
-        waitPoint.reset();
-
-        try {
-            waitPoint.await(interval, TimeUnit.MILLISECONDS);
-        } catch (InterruptedException e) {
-            log.error("Interrupted", e);
-        } finally {
-            hasNotified.set(false);
-            this.onWaitEnd();
+        // LockSupport permits are sticky: an unpark delivered before park 
makes the next park
+        // return at once, and the loop re-checks hasNotified, so no wakeup 
can be lost.
+        long deadline = System.nanoTime() + 
TimeUnit.MILLISECONDS.toNanos(interval);
+        while (!hasNotified.get()) {
+            if (stopped) {
+                break;
+            }
+            long remain = deadline - System.nanoTime();
+            if (remain <= 0) {
+                break;
+            }
+            LockSupport.parkNanos(this, remain);
+            if (Thread.interrupted()) {
+                Thread.currentThread().interrupt();
+                break;
+            }
         }
+
+        hasNotified.set(false);
+        this.onWaitEnd();
     }
 
     protected void onWaitEnd() {
diff --git 
a/common/src/test/java/org/apache/rocketmq/common/ServiceThreadTest.java 
b/common/src/test/java/org/apache/rocketmq/common/ServiceThreadTest.java
index 93208bcb7f..e27fd497bd 100644
--- a/common/src/test/java/org/apache/rocketmq/common/ServiceThreadTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/ServiceThreadTest.java
@@ -17,10 +17,31 @@
 
 package org.apache.rocketmq.common;
 
-import static org.junit.Assert.assertEquals;
-
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link ServiceThread}.
+ *
+ * <p>{@link ServiceThread} used to coordinate {@link ServiceThread#wakeup()} 
and
+ * {@link ServiceThread#waitForRunning(long)} through a {@code 
CountDownLatch2} that was
+ * {@code reset()} on every wait. A {@code wakeup()} landing between the 
fast-path check and
+ * {@code reset()} performed a {@code countDown()} that {@code reset()} 
immediately discarded, so the
+ * loop blocked for the whole interval while {@code hasNotified} stayed {@code 
true} (turning every
+ * later {@code wakeup()} into a no-op). See apache/rocketmq#10543.
+ *
+ * <p>The implementation now uses {@link 
java.util.concurrent.locks.LockSupport} park/unpark, whose
+ * permit semantics cannot drop a signal. The regression tests below assert 
the wakeup is always
+ * delivered promptly, i.e. never stalls for the full interval.
+ */
 public class ServiceThreadTest {
 
     @Test
@@ -43,49 +64,170 @@ public class ServiceThreadTest {
         ServiceThread testServiceThread = startTestServiceThread();
         testServiceThread.wakeup();
         assertEquals(true, testServiceThread.hasNotified.get());
-        assertEquals(0, testServiceThread.waitPoint.getCount());
     }
 
-    @Test
+    @Test(timeout = 5000)
     public void testWaitForRunning() {
         ServiceThread testServiceThread = startTestServiceThread();
-        // test waitForRunning
-        testServiceThread.waitForRunning(1000);
+        // Not notified: returns after the (short) interval with the flag 
cleared.
+        testServiceThread.waitForRunning(50);
         assertEquals(false, testServiceThread.hasNotified.get());
-        assertEquals(1, testServiceThread.waitPoint.getCount());
-        // test wake up
+        // wakeup() arms the notification.
         testServiceThread.wakeup();
         assertEquals(true, testServiceThread.hasNotified.get());
-        assertEquals(0, testServiceThread.waitPoint.getCount());
-        // repeat waitForRunning
-        testServiceThread.waitForRunning(1000);
-        assertEquals(false, testServiceThread.hasNotified.get());
-        assertEquals(0, testServiceThread.waitPoint.getCount());
-        // repeat waitForRunning again
-        testServiceThread.waitForRunning(1000);
+        // The next waitForRunning() must consume the notification 
immediately, never blocking for
+        // the (huge) interval -- this is exactly what the lost-wakeup race 
used to break.
+        long begin = System.currentTimeMillis();
+        testServiceThread.waitForRunning(TimeUnit.MINUTES.toMillis(1));
+        long elapsed = System.currentTimeMillis() - begin;
         assertEquals(false, testServiceThread.hasNotified.get());
-        assertEquals(1, testServiceThread.waitPoint.getCount());
+        assertTrue("waitForRunning() must fast-path on a pending notification, 
elapsed=" + elapsed + "ms",
+            elapsed < 1000);
     }
 
-    private ServiceThread startTestServiceThread() {
-        return startTestServiceThread(false);
+    /**
+     * A single {@code wakeup()} must wake a long-interval wait almost 
immediately, instead of
+     * letting it block for the full interval (which is what the lost-wakeup 
race used to cause).
+     */
+    @Test(timeout = 5000)
+    public void testWakeupDeliveredPromptly() throws Exception {
+        TestServiceThread service = new TestServiceThread();
+        AtomicBoolean returned = new AtomicBoolean(false);
+        long longInterval = TimeUnit.SECONDS.toMillis(10);
+
+        Thread waiter = new Thread(() -> {
+            service.doWait(longInterval);
+            returned.set(true);
+        }, "waiter");
+        waiter.start();
+
+        // Let the waiter enter the park loop.
+        Thread.sleep(200);
+
+        long startNanos = System.nanoTime();
+        service.wakeup();
+        waiter.join(2000);
+        long elapsedMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startNanos);
+
+        assertTrue("waitForRunning() did not return after wakeup() within 2s, 
elapsed=" + elapsedMs + "ms",
+            returned.get());
+        assertTrue("wake latency should be far below the interval, elapsed=" + 
elapsedMs + "ms",
+            elapsedMs < 2000);
     }
 
-    private ServiceThread startTestServiceThread(boolean daemon) {
-        ServiceThread testServiceThread = new ServiceThread() {
+    /**
+     * Hammer the exact pattern that triggered the lost-wakeup race: a {@code 
wakeup()} fired right
+     * as the waiter is entering the wait. With the LockSupport-based 
implementation no signal may be
+     * lost, so every iteration must return well within its (long) interval.
+     */
+    @Test(timeout = 60000)
+    public void testNoWakeupLostUnderStress() throws Exception {
+        int iterations = 1000;
+        long longInterval = TimeUnit.SECONDS.toMillis(5);
+        int lost = 0;
+
+        for (int i = 0; i < iterations; i++) {
+            TestServiceThread service = new TestServiceThread();
+            AtomicBoolean returned = new AtomicBoolean(false);
+
+            Thread waiter = new Thread(() -> {
+                service.doWait(longInterval);
+                returned.set(true);
+            }, "waiter-" + i);
+            waiter.start();
+
+            // Increase the chance the wakeup lands in the CAS-to-park window.
+            Thread.yield();
+            service.wakeup();
+
+            // With the fix the waiter returns in microseconds; a lost signal 
would block for the
+            // full 5s interval, so a 2s join is more than enough to 
distinguish the two.
+            waiter.join(2000);
+            if (!returned.get()) {
+                lost++;
+                waiter.interrupt();
+                waiter.join(1000);
+            }
+        }
+
+        assertEquals("ServiceThread must not lose any wakeup signal", 0, lost);
+    }
+
+    /**
+     * Single consumer draining {@code waitForRunning} in a tight loop while 
several threads race to
+     * {@code wakeup()} it. A lost wakeup shows up as a wait that blocks for 
the full interval.
+     */
+    @Test(timeout = 30000)
+    public void serviceThreadShouldNotLoseWakeupUnderStress() throws Exception 
{
+        final int stressIterations = 10000;
+        final int wakerThreads = 4;
+        final long waitTimeoutMs = 20;
+        final long lostWakeupThresholdMs = 18;
+
+        StressServiceThread service = new StressServiceThread();
+        AtomicInteger activeIteration = new AtomicInteger(-1);
+        AtomicInteger completedIteration = new AtomicInteger(-1);
+        AtomicInteger lostWakeups = new AtomicInteger(0);
+        AtomicInteger maxElapsedMs = new AtomicInteger(0);
+        AtomicBoolean running = new AtomicBoolean(true);
+        AtomicReference<Throwable> failure = new AtomicReference<>();
+        ExecutorService executor = Executors.newFixedThreadPool(wakerThreads + 
1);
+
+        try {
+            executor.submit(() -> {
+                try {
+                    for (int i = 0; i < stressIterations; i++) {
+                        activeIteration.set(i);
+                        long elapsed = service.awaitOnce(waitTimeoutMs);
+                        maxElapsedMs.accumulateAndGet((int) elapsed, 
Math::max);
+                        if (elapsed >= lostWakeupThresholdMs) {
+                            lostWakeups.incrementAndGet();
+                            running.set(false);
+                            break;
+                        }
+                        completedIteration.set(i);
+                        Thread.yield();
+                    }
+                } catch (Throwable t) {
+                    failure.compareAndSet(null, t);
+                } finally {
+                    running.set(false);
+                }
+            });
 
-            @Override
-            public void run() {
-                doNothing();
+            for (int w = 0; w < wakerThreads; w++) {
+                executor.submit(() -> {
+                    while (running.get()) {
+                        int iteration = activeIteration.get();
+                        if (iteration >= 0 && completedIteration.get() < 
iteration) {
+                            service.wakeup();
+                        }
+                        Thread.yield();
+                    }
+                });
             }
 
-            private void doNothing() {}
+            executor.shutdown();
+            assertTrue("stress test did not finish", 
executor.awaitTermination(25, TimeUnit.SECONDS));
 
-            @Override
-            public String getServiceName() {
-                return "TestServiceThread";
+            Throwable error = failure.get();
+            if (error != null) {
+                throw new AssertionError("stress test failed", error);
             }
-        };
+            assertEquals("ServiceThread lost wakeups under stress 
(maxElapsedMs=" + maxElapsedMs.get() + ")",
+                0, lostWakeups.get());
+        } finally {
+            running.set(false);
+            executor.shutdownNow();
+        }
+    }
+
+    private ServiceThread startTestServiceThread() {
+        return startTestServiceThread(false);
+    }
+
+    private ServiceThread startTestServiceThread(boolean daemon) {
+        ServiceThread testServiceThread = new TestServiceThread();
         testServiceThread.setDaemon(daemon);
         // test start
         testServiceThread.start();
@@ -108,6 +250,43 @@ public class ServiceThreadTest {
         }
         assertEquals(true, testServiceThread.isStopped());
         assertEquals(true, testServiceThread.hasNotified.get());
-        assertEquals(0, testServiceThread.waitPoint.getCount());
+    }
+
+    private static class TestServiceThread extends ServiceThread {
+
+        @Override
+        public void run() {
+            doNothing();
+        }
+
+        private void doNothing() {
+        }
+
+        @Override
+        public String getServiceName() {
+            return "TestServiceThread";
+        }
+
+        void doWait(long intervalMillis) {
+            waitForRunning(intervalMillis);
+        }
+    }
+
+    private static final class StressServiceThread extends ServiceThread {
+
+        @Override
+        public String getServiceName() {
+            return "StressServiceThread";
+        }
+
+        @Override
+        public void run() {
+        }
+
+        long awaitOnce(long intervalMillis) {
+            long begin = System.nanoTime();
+            waitForRunning(intervalMillis);
+            return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - begin);
+        }
     }
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java 
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 1c46f9e2ce..d2f2da8b7d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -1792,9 +1792,7 @@ public class CommitLog implements Swappable {
             synchronized (this.requestsWrite) {
                 this.requestsWrite.add(request);
             }
-            if (hasNotified.compareAndSet(false, true)) {
-                waitPoint.countDown(); // notify
-            }
+            this.wakeup();
             boolean flag = this.requestsWrite.size() >
                 
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getMaxAsyncPutMessageRequests();
             if (flag) {

Reply via email to