Copilot commented on code in PR #10544:
URL: https://github.com/apache/rocketmq/pull/10544#discussion_r3458468303
##########
common/src/main/java/org/apache/rocketmq/common/ServiceThread.java:
##########
@@ -97,32 +97,49 @@ public void makeStop() {
return;
}
this.stopped = true;
+ // wake up the parked worker so it observes the stop flag promptly
+ wakeup();
log.info("makestop thread[{}] ", this.getServiceName());
}
public void wakeup() {
+ if (hasNotified.get()) {
+ return;
+ }
if (hasNotified.compareAndSet(false, true)) {
- waitPoint.countDown(); // notify
+ LockSupport.unpark(this.thread); // notify
}
}
protected void waitForRunning(long interval) {
+ // Publish the parking thread so wakeup() can target it (also handles
restart).
+ this.thread = Thread.currentThread();
+
if (hasNotified.compareAndSet(true, false)) {
Review Comment:
`waitForRunning()` overwrites `this.thread` with the calling thread. If
`thread` is also used for lifecycle management elsewhere (e.g., `start()` sets
it, `shutdown()`/`join()` uses it), calling `waitForRunning()` from any
non-service thread can cause `wakeup()` to unpark the wrong thread and/or cause
shutdown to join the wrong thread. A safer approach is to keep `thread` as the
dedicated service thread reference and introduce a separate `volatile Thread
parkingThread` (updated in `waitForRunning`) for `LockSupport.unpark(...)`, or
ensure `thread` is only ever assigned in `start()`.
##########
common/src/test/java/org/apache/rocketmq/common/ServiceThreadTest.java:
##########
@@ -43,49 +64,170 @@ public void testWakeup() {
ServiceThread testServiceThread = startTestServiceThread();
testServiceThread.wakeup();
assertEquals(true, testServiceThread.hasNotified.get());
- assertEquals(0, testServiceThread.waitPoint.getCount());
}
- @Test
+ @Test(timeout = 5000)
public void testWaitForRunning() {
ServiceThread testServiceThread = startTestServiceThread();
- // test waitForRunning
- testServiceThread.waitForRunning(1000);
+ // Not notified: returns after the (short) interval with the flag
cleared.
+ testServiceThread.waitForRunning(50);
assertEquals(false, testServiceThread.hasNotified.get());
- assertEquals(1, testServiceThread.waitPoint.getCount());
- // test wake up
+ // wakeup() arms the notification.
testServiceThread.wakeup();
assertEquals(true, testServiceThread.hasNotified.get());
- assertEquals(0, testServiceThread.waitPoint.getCount());
- // repeat waitForRunning
- testServiceThread.waitForRunning(1000);
- assertEquals(false, testServiceThread.hasNotified.get());
- assertEquals(0, testServiceThread.waitPoint.getCount());
- // repeat waitForRunning again
- testServiceThread.waitForRunning(1000);
+ // The next waitForRunning() must consume the notification
immediately, never blocking for
+ // the (huge) interval -- this is exactly what the lost-wakeup race
used to break.
+ long begin = System.currentTimeMillis();
+ testServiceThread.waitForRunning(TimeUnit.MINUTES.toMillis(1));
+ long elapsed = System.currentTimeMillis() - begin;
assertEquals(false, testServiceThread.hasNotified.get());
- assertEquals(1, testServiceThread.waitPoint.getCount());
+ assertTrue("waitForRunning() must fast-path on a pending notification,
elapsed=" + elapsed + "ms",
+ elapsed < 1000);
}
- private ServiceThread startTestServiceThread() {
- return startTestServiceThread(false);
+ /**
+ * A single {@code wakeup()} must wake a long-interval wait almost
immediately, instead of
+ * letting it block for the full interval (which is what the lost-wakeup
race used to cause).
+ */
+ @Test(timeout = 5000)
+ public void testWakeupDeliveredPromptly() throws Exception {
+ TestServiceThread service = new TestServiceThread();
+ AtomicBoolean returned = new AtomicBoolean(false);
+ long longInterval = TimeUnit.SECONDS.toMillis(10);
+
+ Thread waiter = new Thread(() -> {
+ service.doWait(longInterval);
+ returned.set(true);
+ }, "waiter");
+ waiter.start();
+
+ // Let the waiter enter the park loop.
+ Thread.sleep(200);
+
+ long startNanos = System.nanoTime();
+ service.wakeup();
+ waiter.join(2000);
+ long elapsedMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
startNanos);
+
+ assertTrue("waitForRunning() did not return after wakeup() within 2s,
elapsed=" + elapsedMs + "ms",
+ returned.get());
+ assertTrue("wake latency should be far below the interval, elapsed=" +
elapsedMs + "ms",
+ elapsedMs < 2000);
}
- private ServiceThread startTestServiceThread(boolean daemon) {
- ServiceThread testServiceThread = new ServiceThread() {
+ /**
+ * Hammer the exact pattern that triggered the lost-wakeup race: a {@code
wakeup()} fired right
+ * as the waiter is entering the wait. With the LockSupport-based
implementation no signal may be
+ * lost, so every iteration must return well within its (long) interval.
+ */
+ @Test(timeout = 60000)
+ public void testNoWakeupLostUnderStress() throws Exception {
+ int iterations = 1000;
+ long longInterval = TimeUnit.SECONDS.toMillis(5);
+ int lost = 0;
+
+ for (int i = 0; i < iterations; i++) {
+ TestServiceThread service = new TestServiceThread();
+ AtomicBoolean returned = new AtomicBoolean(false);
+
+ Thread waiter = new Thread(() -> {
+ service.doWait(longInterval);
+ returned.set(true);
+ }, "waiter-" + i);
+ waiter.start();
+
+ // Increase the chance the wakeup lands in the CAS-to-park window.
+ Thread.yield();
+ service.wakeup();
+
+ // With the fix the waiter returns in microseconds; a lost signal
would block for the
+ // full 5s interval, so a 2s join is more than enough to
distinguish the two.
+ waiter.join(2000);
+ if (!returned.get()) {
+ lost++;
+ waiter.interrupt();
+ waiter.join(1000);
+ }
+ }
+
+ assertEquals("ServiceThread must not lose any wakeup signal", 0, lost);
+ }
+
+ /**
+ * Single consumer draining {@code waitForRunning} in a tight loop while
several threads race to
+ * {@code wakeup()} it. A lost wakeup shows up as a wait that blocks for
the full interval.
+ */
+ @Test(timeout = 30000)
+ public void serviceThreadShouldNotLoseWakeupUnderStress() throws Exception
{
+ final int stressIterations = 10000;
+ final int wakerThreads = 4;
+ final long waitTimeoutMs = 20;
+ final long lostWakeupThresholdMs = 18;
Review Comment:
These thresholds are extremely tight (20ms wait with an 18ms 'lost wakeup'
cutoff) and are likely to be flaky under normal CI variance (scheduler jitter,
GC pauses, timer granularity). To make the test robust, increase the interval
and use a much larger threshold (e.g., wait 200–1000ms and fail only if it
blocks close to the full interval), or structure the test to differentiate
'prompt wakeup' vs 'full-interval stall' using a long interval (seconds) and a
conservative cutoff (e.g., < 500ms).
##########
common/src/test/java/org/apache/rocketmq/common/ServiceThreadTest.java:
##########
@@ -43,49 +64,170 @@ public void testWakeup() {
ServiceThread testServiceThread = startTestServiceThread();
testServiceThread.wakeup();
assertEquals(true, testServiceThread.hasNotified.get());
- assertEquals(0, testServiceThread.waitPoint.getCount());
}
- @Test
+ @Test(timeout = 5000)
public void testWaitForRunning() {
ServiceThread testServiceThread = startTestServiceThread();
- // test waitForRunning
- testServiceThread.waitForRunning(1000);
+ // Not notified: returns after the (short) interval with the flag
cleared.
+ testServiceThread.waitForRunning(50);
assertEquals(false, testServiceThread.hasNotified.get());
- assertEquals(1, testServiceThread.waitPoint.getCount());
- // test wake up
+ // wakeup() arms the notification.
testServiceThread.wakeup();
assertEquals(true, testServiceThread.hasNotified.get());
- assertEquals(0, testServiceThread.waitPoint.getCount());
- // repeat waitForRunning
- testServiceThread.waitForRunning(1000);
- assertEquals(false, testServiceThread.hasNotified.get());
- assertEquals(0, testServiceThread.waitPoint.getCount());
- // repeat waitForRunning again
- testServiceThread.waitForRunning(1000);
+ // The next waitForRunning() must consume the notification
immediately, never blocking for
+ // the (huge) interval -- this is exactly what the lost-wakeup race
used to break.
+ long begin = System.currentTimeMillis();
+ testServiceThread.waitForRunning(TimeUnit.MINUTES.toMillis(1));
+ long elapsed = System.currentTimeMillis() - begin;
assertEquals(false, testServiceThread.hasNotified.get());
- assertEquals(1, testServiceThread.waitPoint.getCount());
+ assertTrue("waitForRunning() must fast-path on a pending notification,
elapsed=" + elapsed + "ms",
+ elapsed < 1000);
}
- private ServiceThread startTestServiceThread() {
- return startTestServiceThread(false);
+ /**
+ * A single {@code wakeup()} must wake a long-interval wait almost
immediately, instead of
+ * letting it block for the full interval (which is what the lost-wakeup
race used to cause).
+ */
+ @Test(timeout = 5000)
+ public void testWakeupDeliveredPromptly() throws Exception {
+ TestServiceThread service = new TestServiceThread();
+ AtomicBoolean returned = new AtomicBoolean(false);
+ long longInterval = TimeUnit.SECONDS.toMillis(10);
+
+ Thread waiter = new Thread(() -> {
+ service.doWait(longInterval);
+ returned.set(true);
+ }, "waiter");
+ waiter.start();
+
+ // Let the waiter enter the park loop.
+ Thread.sleep(200);
+
+ long startNanos = System.nanoTime();
+ service.wakeup();
+ waiter.join(2000);
+ long elapsedMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
startNanos);
+
+ assertTrue("waitForRunning() did not return after wakeup() within 2s,
elapsed=" + elapsedMs + "ms",
+ returned.get());
+ assertTrue("wake latency should be far below the interval, elapsed=" +
elapsedMs + "ms",
+ elapsedMs < 2000);
}
- private ServiceThread startTestServiceThread(boolean daemon) {
- ServiceThread testServiceThread = new ServiceThread() {
+ /**
+ * Hammer the exact pattern that triggered the lost-wakeup race: a {@code
wakeup()} fired right
+ * as the waiter is entering the wait. With the LockSupport-based
implementation no signal may be
+ * lost, so every iteration must return well within its (long) interval.
+ */
+ @Test(timeout = 60000)
+ public void testNoWakeupLostUnderStress() throws Exception {
+ int iterations = 1000;
+ long longInterval = TimeUnit.SECONDS.toMillis(5);
+ int lost = 0;
+
+ for (int i = 0; i < iterations; i++) {
+ TestServiceThread service = new TestServiceThread();
+ AtomicBoolean returned = new AtomicBoolean(false);
+
+ Thread waiter = new Thread(() -> {
+ service.doWait(longInterval);
+ returned.set(true);
+ }, "waiter-" + i);
+ waiter.start();
Review Comment:
This test creates up to 1000 new JVM threads in a tight loop, which can
significantly slow the suite and increase flakiness on constrained CI agents.
Consider reusing a small executor / a single reusable waiter thread with
per-iteration coordination (e.g., `CountDownLatch`/`CyclicBarrier`) or reduce
the iteration count while keeping the 'long-interval stall' signal strong.
##########
common/src/test/java/org/apache/rocketmq/common/ServiceThreadTest.java:
##########
@@ -43,49 +64,170 @@ public void testWakeup() {
ServiceThread testServiceThread = startTestServiceThread();
testServiceThread.wakeup();
assertEquals(true, testServiceThread.hasNotified.get());
- assertEquals(0, testServiceThread.waitPoint.getCount());
}
- @Test
+ @Test(timeout = 5000)
public void testWaitForRunning() {
ServiceThread testServiceThread = startTestServiceThread();
- // test waitForRunning
- testServiceThread.waitForRunning(1000);
+ // Not notified: returns after the (short) interval with the flag
cleared.
+ testServiceThread.waitForRunning(50);
assertEquals(false, testServiceThread.hasNotified.get());
- assertEquals(1, testServiceThread.waitPoint.getCount());
- // test wake up
+ // wakeup() arms the notification.
testServiceThread.wakeup();
assertEquals(true, testServiceThread.hasNotified.get());
- assertEquals(0, testServiceThread.waitPoint.getCount());
- // repeat waitForRunning
- testServiceThread.waitForRunning(1000);
- assertEquals(false, testServiceThread.hasNotified.get());
- assertEquals(0, testServiceThread.waitPoint.getCount());
- // repeat waitForRunning again
- testServiceThread.waitForRunning(1000);
+ // The next waitForRunning() must consume the notification
immediately, never blocking for
+ // the (huge) interval -- this is exactly what the lost-wakeup race
used to break.
+ long begin = System.currentTimeMillis();
+ testServiceThread.waitForRunning(TimeUnit.MINUTES.toMillis(1));
+ long elapsed = System.currentTimeMillis() - begin;
Review Comment:
For measuring elapsed time in tests, `System.nanoTime()` is preferable to
`currentTimeMillis()` (monotonic vs wall-clock adjustments). Using `nanoTime()`
here will avoid rare failures caused by clock changes or time sync events.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]