ShadowySpirits opened a new issue, #10543: URL: https://github.com/apache/rocketmq/issues/10543
### Before Creating the Bug Report - [x] I found a bug, not just asking a question, which should be created in [GitHub Discussions](https://github.com/apache/rocketmq/discussions). - [x] I have searched the [GitHub Issues](https://github.com/apache/rocketmq/issues) and [GitHub Discussions](https://github.com/apache/rocketmq/discussions) of this repository and believe that this is not a duplicate. - [x] I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ. ### Runtime platform environment OS: any ### RocketMQ version 5.x ### JDK Version JDK 8 ### Describe the Bug `ServiceThread#wakeup()` and `ServiceThread#waitForRunning()` coordinate through an `AtomicBoolean hasNotified` and a resettable latch `CountDownLatch2 waitPoint`: ```java public void wakeup() { if (hasNotified.compareAndSet(false, true)) { waitPoint.countDown(); // notify } } protected void waitForRunning(long interval) { if (hasNotified.compareAndSet(true, false)) { // fast path this.onWaitEnd(); return; } waitPoint.reset(); // <-- race point try { waitPoint.await(interval, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.error("Interrupted", e); Thread.currentThread().interrupt(); } finally { hasNotified.set(false); this.onWaitEnd(); } } ``` There is a race between `waitPoint.countDown()` in `wakeup()` and `waitPoint.reset()` in `waitForRunning()`. If a `wakeup()` lands after the fast-path CAS has already returned `false` but before `waitPoint.reset()` executes, the `countDown()` is overwritten by `reset()`, and the worker then blocks on `await(interval)` for the full interval (default `1000ms`) even though it was already notified. The wakeup is effectively swallowed. ### Steps to Reproduce Interleaving of worker thread A (running `waitForRunning(1000)`) and caller thread B (calling `wakeup()`). Initial state: `hasNotified=false`, `waitPoint` count = 1. | Step | Thread A (`waitForRunning`) | Thread B (`wakeup`) | State | |------|------------------------------|----------------------|-------| | 1 | `hasNotified.CAS(true,false)` → `false`, skip fast path | | `hasNotified=false`, count=1 | | 2 | (about to call `reset()`) | `hasNotified.CAS(false,true)` → `true`; `waitPoint.countDown()` | `hasNotified=true`, count=0 | | 3 | `waitPoint.reset()` → count restored to 1 (**countDown discarded**) | | `hasNotified=true`, count=1 | | 4 | `waitPoint.await(1000)` blocks; no further `countDown` is coming | | blocks ~1000ms | | 5 | after timeout: `finally` sets `hasNotified=false`, `onWaitEnd()` | | wakeup lost | A deterministic reproduction is included in the repo (`ServiceThreadTest`): it injects a `CountDownLatch2` whose `reset()` blocks until a `wakeup()` has been delivered, forcing the exact interleaving above, then asserts the wait returned promptly. ### What Did You Expect to See? A `wakeup()` that happens concurrently with entering `waitForRunning()` should cause the wait to return (almost) immediately — the thread should react to the notification, not sleep for the whole `interval`. ### What Did You See Instead? The `countDown()` is wiped out by `reset()`, so `await(interval)` sleeps the full interval (≈1000ms by default). The wakeup is delayed by up to one interval, which adds latency to every service thread relying on `wakeup()` for prompt reaction (flush services, reput/dispatch, HA, etc.). Under bursty notification patterns this manifests as periodic ~1s stalls. ### Additional Context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
