This is an automated email from the ASF dual-hosted git repository.
lizhimins pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 88709c56aa [ISSUE #10543] resolve data race in ServiceThread wakeup
mechanism (#10544)
88709c56aa is described below
commit 88709c56aacae16634e7dc8a6a5f06e4a93b678d
Author: SSpirits <[email protected]>
AuthorDate: Tue Jun 30 10:26:23 2026 +0800
[ISSUE #10543] resolve data race in ServiceThread wakeup mechanism (#10544)
---
.../org/apache/rocketmq/common/ServiceThread.java | 45 ++--
.../apache/rocketmq/common/ServiceThreadTest.java | 237 ++++++++++++++++++---
.../java/org/apache/rocketmq/store/CommitLog.java | 4 +-
3 files changed, 240 insertions(+), 46 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/ServiceThread.java
b/common/src/main/java/org/apache/rocketmq/common/ServiceThread.java
index cec00bab02..52b95b6855 100644
--- a/common/src/main/java/org/apache/rocketmq/common/ServiceThread.java
+++ b/common/src/main/java/org/apache/rocketmq/common/ServiceThread.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.common;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.LockSupport;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -28,9 +29,8 @@ public abstract class ServiceThread implements Runnable {
private static final long JOIN_TIME = 90 * 1000;
- protected Thread thread;
- protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
- protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);
+ protected volatile Thread thread;
+ protected final AtomicBoolean hasNotified = new AtomicBoolean(false);
protected volatile boolean stopped = false;
protected boolean isDaemon = false;
@@ -97,32 +97,49 @@ public abstract class ServiceThread implements Runnable {
return;
}
this.stopped = true;
+ // wake up the parked worker so it observes the stop flag promptly
+ wakeup();
log.info("makestop thread[{}] ", this.getServiceName());
}
public void wakeup() {
+ if (hasNotified.get()) {
+ return;
+ }
if (hasNotified.compareAndSet(false, true)) {
- waitPoint.countDown(); // notify
+ LockSupport.unpark(this.thread); // notify
}
}
protected void waitForRunning(long interval) {
+ // Publish the parking thread so wakeup() can target it (also handles
restart).
+ this.thread = Thread.currentThread();
+
if (hasNotified.compareAndSet(true, false)) {
this.onWaitEnd();
return;
}
- //entry to wait
- waitPoint.reset();
-
- try {
- waitPoint.await(interval, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- log.error("Interrupted", e);
- } finally {
- hasNotified.set(false);
- this.onWaitEnd();
+ // LockSupport permits are sticky: an unpark delivered before park
makes the next park
+ // return at once, and the loop re-checks hasNotified, so no wakeup
can be lost.
+ long deadline = System.nanoTime() +
TimeUnit.MILLISECONDS.toNanos(interval);
+ while (!hasNotified.get()) {
+ if (stopped) {
+ break;
+ }
+ long remain = deadline - System.nanoTime();
+ if (remain <= 0) {
+ break;
+ }
+ LockSupport.parkNanos(this, remain);
+ if (Thread.interrupted()) {
+ Thread.currentThread().interrupt();
+ break;
+ }
}
+
+ hasNotified.set(false);
+ this.onWaitEnd();
}
protected void onWaitEnd() {
diff --git
a/common/src/test/java/org/apache/rocketmq/common/ServiceThreadTest.java
b/common/src/test/java/org/apache/rocketmq/common/ServiceThreadTest.java
index 93208bcb7f..e27fd497bd 100644
--- a/common/src/test/java/org/apache/rocketmq/common/ServiceThreadTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/ServiceThreadTest.java
@@ -17,10 +17,31 @@
package org.apache.rocketmq.common;
-import static org.junit.Assert.assertEquals;
-
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link ServiceThread}.
+ *
+ * <p>{@link ServiceThread} used to coordinate {@link ServiceThread#wakeup()}
and
+ * {@link ServiceThread#waitForRunning(long)} through a {@code
CountDownLatch2} that was
+ * {@code reset()} on every wait. A {@code wakeup()} landing between the
fast-path check and
+ * {@code reset()} performed a {@code countDown()} that {@code reset()}
immediately discarded, so the
+ * loop blocked for the whole interval while {@code hasNotified} stayed {@code
true} (turning every
+ * later {@code wakeup()} into a no-op). See apache/rocketmq#10543.
+ *
+ * <p>The implementation now uses {@link
java.util.concurrent.locks.LockSupport} park/unpark, whose
+ * permit semantics cannot drop a signal. The regression tests below assert
the wakeup is always
+ * delivered promptly, i.e. never stalls for the full interval.
+ */
public class ServiceThreadTest {
@Test
@@ -43,49 +64,170 @@ public class ServiceThreadTest {
ServiceThread testServiceThread = startTestServiceThread();
testServiceThread.wakeup();
assertEquals(true, testServiceThread.hasNotified.get());
- assertEquals(0, testServiceThread.waitPoint.getCount());
}
- @Test
+ @Test(timeout = 5000)
public void testWaitForRunning() {
ServiceThread testServiceThread = startTestServiceThread();
- // test waitForRunning
- testServiceThread.waitForRunning(1000);
+ // Not notified: returns after the (short) interval with the flag
cleared.
+ testServiceThread.waitForRunning(50);
assertEquals(false, testServiceThread.hasNotified.get());
- assertEquals(1, testServiceThread.waitPoint.getCount());
- // test wake up
+ // wakeup() arms the notification.
testServiceThread.wakeup();
assertEquals(true, testServiceThread.hasNotified.get());
- assertEquals(0, testServiceThread.waitPoint.getCount());
- // repeat waitForRunning
- testServiceThread.waitForRunning(1000);
- assertEquals(false, testServiceThread.hasNotified.get());
- assertEquals(0, testServiceThread.waitPoint.getCount());
- // repeat waitForRunning again
- testServiceThread.waitForRunning(1000);
+ // The next waitForRunning() must consume the notification
immediately, never blocking for
+ // the (huge) interval -- this is exactly what the lost-wakeup race
used to break.
+ long begin = System.currentTimeMillis();
+ testServiceThread.waitForRunning(TimeUnit.MINUTES.toMillis(1));
+ long elapsed = System.currentTimeMillis() - begin;
assertEquals(false, testServiceThread.hasNotified.get());
- assertEquals(1, testServiceThread.waitPoint.getCount());
+ assertTrue("waitForRunning() must fast-path on a pending notification,
elapsed=" + elapsed + "ms",
+ elapsed < 1000);
}
- private ServiceThread startTestServiceThread() {
- return startTestServiceThread(false);
+ /**
+ * A single {@code wakeup()} must wake a long-interval wait almost
immediately, instead of
+ * letting it block for the full interval (which is what the lost-wakeup
race used to cause).
+ */
+ @Test(timeout = 5000)
+ public void testWakeupDeliveredPromptly() throws Exception {
+ TestServiceThread service = new TestServiceThread();
+ AtomicBoolean returned = new AtomicBoolean(false);
+ long longInterval = TimeUnit.SECONDS.toMillis(10);
+
+ Thread waiter = new Thread(() -> {
+ service.doWait(longInterval);
+ returned.set(true);
+ }, "waiter");
+ waiter.start();
+
+ // Let the waiter enter the park loop.
+ Thread.sleep(200);
+
+ long startNanos = System.nanoTime();
+ service.wakeup();
+ waiter.join(2000);
+ long elapsedMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
startNanos);
+
+ assertTrue("waitForRunning() did not return after wakeup() within 2s,
elapsed=" + elapsedMs + "ms",
+ returned.get());
+ assertTrue("wake latency should be far below the interval, elapsed=" +
elapsedMs + "ms",
+ elapsedMs < 2000);
}
- private ServiceThread startTestServiceThread(boolean daemon) {
- ServiceThread testServiceThread = new ServiceThread() {
+ /**
+ * Hammer the exact pattern that triggered the lost-wakeup race: a {@code
wakeup()} fired right
+ * as the waiter is entering the wait. With the LockSupport-based
implementation no signal may be
+ * lost, so every iteration must return well within its (long) interval.
+ */
+ @Test(timeout = 60000)
+ public void testNoWakeupLostUnderStress() throws Exception {
+ int iterations = 1000;
+ long longInterval = TimeUnit.SECONDS.toMillis(5);
+ int lost = 0;
+
+ for (int i = 0; i < iterations; i++) {
+ TestServiceThread service = new TestServiceThread();
+ AtomicBoolean returned = new AtomicBoolean(false);
+
+ Thread waiter = new Thread(() -> {
+ service.doWait(longInterval);
+ returned.set(true);
+ }, "waiter-" + i);
+ waiter.start();
+
+ // Increase the chance the wakeup lands in the CAS-to-park window.
+ Thread.yield();
+ service.wakeup();
+
+ // With the fix the waiter returns in microseconds; a lost signal
would block for the
+ // full 5s interval, so a 2s join is more than enough to
distinguish the two.
+ waiter.join(2000);
+ if (!returned.get()) {
+ lost++;
+ waiter.interrupt();
+ waiter.join(1000);
+ }
+ }
+
+ assertEquals("ServiceThread must not lose any wakeup signal", 0, lost);
+ }
+
+ /**
+ * Single consumer draining {@code waitForRunning} in a tight loop while
several threads race to
+ * {@code wakeup()} it. A lost wakeup shows up as a wait that blocks for
the full interval.
+ */
+ @Test(timeout = 30000)
+ public void serviceThreadShouldNotLoseWakeupUnderStress() throws Exception
{
+ final int stressIterations = 10000;
+ final int wakerThreads = 4;
+ final long waitTimeoutMs = 20;
+ final long lostWakeupThresholdMs = 18;
+
+ StressServiceThread service = new StressServiceThread();
+ AtomicInteger activeIteration = new AtomicInteger(-1);
+ AtomicInteger completedIteration = new AtomicInteger(-1);
+ AtomicInteger lostWakeups = new AtomicInteger(0);
+ AtomicInteger maxElapsedMs = new AtomicInteger(0);
+ AtomicBoolean running = new AtomicBoolean(true);
+ AtomicReference<Throwable> failure = new AtomicReference<>();
+ ExecutorService executor = Executors.newFixedThreadPool(wakerThreads +
1);
+
+ try {
+ executor.submit(() -> {
+ try {
+ for (int i = 0; i < stressIterations; i++) {
+ activeIteration.set(i);
+ long elapsed = service.awaitOnce(waitTimeoutMs);
+ maxElapsedMs.accumulateAndGet((int) elapsed,
Math::max);
+ if (elapsed >= lostWakeupThresholdMs) {
+ lostWakeups.incrementAndGet();
+ running.set(false);
+ break;
+ }
+ completedIteration.set(i);
+ Thread.yield();
+ }
+ } catch (Throwable t) {
+ failure.compareAndSet(null, t);
+ } finally {
+ running.set(false);
+ }
+ });
- @Override
- public void run() {
- doNothing();
+ for (int w = 0; w < wakerThreads; w++) {
+ executor.submit(() -> {
+ while (running.get()) {
+ int iteration = activeIteration.get();
+ if (iteration >= 0 && completedIteration.get() <
iteration) {
+ service.wakeup();
+ }
+ Thread.yield();
+ }
+ });
}
- private void doNothing() {}
+ executor.shutdown();
+ assertTrue("stress test did not finish",
executor.awaitTermination(25, TimeUnit.SECONDS));
- @Override
- public String getServiceName() {
- return "TestServiceThread";
+ Throwable error = failure.get();
+ if (error != null) {
+ throw new AssertionError("stress test failed", error);
}
- };
+ assertEquals("ServiceThread lost wakeups under stress
(maxElapsedMs=" + maxElapsedMs.get() + ")",
+ 0, lostWakeups.get());
+ } finally {
+ running.set(false);
+ executor.shutdownNow();
+ }
+ }
+
+ private ServiceThread startTestServiceThread() {
+ return startTestServiceThread(false);
+ }
+
+ private ServiceThread startTestServiceThread(boolean daemon) {
+ ServiceThread testServiceThread = new TestServiceThread();
testServiceThread.setDaemon(daemon);
// test start
testServiceThread.start();
@@ -108,6 +250,43 @@ public class ServiceThreadTest {
}
assertEquals(true, testServiceThread.isStopped());
assertEquals(true, testServiceThread.hasNotified.get());
- assertEquals(0, testServiceThread.waitPoint.getCount());
+ }
+
+ private static class TestServiceThread extends ServiceThread {
+
+ @Override
+ public void run() {
+ doNothing();
+ }
+
+ private void doNothing() {
+ }
+
+ @Override
+ public String getServiceName() {
+ return "TestServiceThread";
+ }
+
+ void doWait(long intervalMillis) {
+ waitForRunning(intervalMillis);
+ }
+ }
+
+ private static final class StressServiceThread extends ServiceThread {
+
+ @Override
+ public String getServiceName() {
+ return "StressServiceThread";
+ }
+
+ @Override
+ public void run() {
+ }
+
+ long awaitOnce(long intervalMillis) {
+ long begin = System.nanoTime();
+ waitForRunning(intervalMillis);
+ return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - begin);
+ }
}
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 1c46f9e2ce..d2f2da8b7d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -1792,9 +1792,7 @@ public class CommitLog implements Swappable {
synchronized (this.requestsWrite) {
this.requestsWrite.add(request);
}
- if (hasNotified.compareAndSet(false, true)) {
- waitPoint.countDown(); // notify
- }
+ this.wakeup();
boolean flag = this.requestsWrite.size() >
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getMaxAsyncPutMessageRequests();
if (flag) {