This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 6a52d0cfa1 fix: restore acquire semantics for AbstractNodeQueue node 
reads (#3007)
6a52d0cfa1 is described below

commit 6a52d0cfa19d5988fcd567bea088931becfb05b1
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Fri May 29 19:02:46 2026 +0800

    fix: restore acquire semantics for AbstractNodeQueue node reads (#3007)
    
    * fix: restore acquire semantics for AbstractNodeQueue node reads
    
    Motivation:
    JDK 25 nightly stream tests hang for the full test timeout (the
    recurring failures behind #2573 / #2870). A local reproduction (the
    full stream-tests run with the nightly virtualize=on + timefactor=4
    options on JDK 25) pins it down: one
    `...-pekko.test.stream-dispatcher-CarrierThread-N` consumes ~97% CPU
    (cpu time approximately equal to elapsed time) stuck in
    `AbstractNodeQueue.pollNode`, while every other carrier is idle in
    `ForkJoinPool.awaitWork` and a full virtual-thread dump shows no
    producer thread anywhere. The spinning consumer is a virtual thread,
    so the unbounded CPU spin pins its carrier permanently; the stream
    never progresses and the test's `futureValue` never completes. Every
    affected test passes in isolation (~100ms) even with the full nightly
    JVM options, because the hang only appears under sustained load: it is
    a JIT-state-dependent data race.
    
    Root cause: PR #1990 (avoid sun.misc.Unsafe by using VarHandles)
    mapped the producer writes correctly (`Unsafe.putOrderedObject` ->
    `VarHandle.setRelease`) but downgraded every consumer read from
    `Unsafe.getObjectVolatile` (a volatile load) to `VarHandle.get` — a
    plain load, since `VarHandle.get` has plain semantics even when the
    field is declared `volatile`. A plain read is not ordered against the
    producer's release store, so it establishes no happens-before with the
    published node, and inside the busy-spin loops in `peekNode`/`pollNode`
    (`do { next = tail.next(); } while (next == null);`) the JIT may hoist
    the plain load out of the loop, producing an unbounded spin that never
    observes the linked next node. JDK 25's C2 makes this manifest
    reliably, and virtual-thread carriers turn the transient spin into a
    permanent 100% CPU pin.
    
    Modification:
    - `Node.next()` and the four `tailHandle` reads (`peekNode`,
      `pollNode`, `isEmpty`, `count`) now use `getAcquire`, pairing with
      the existing `setRelease` writes. This re-establishes the
      happens-before and prevents the JIT from hoisting the loads out of
      the spin loops. `getAcquire` (not `getVolatile`) is used because the
      release/acquire pairing is exactly what is required here; for a load
      it compiles to the same instruction as a volatile load on x86-64
      (MOV) and AArch64 (LDAR), so the stronger sequential consistency
      would add no value.
    - Added `Thread.onSpinWait()` to both busy-spin loops: it hints the CPU
      that we are busy-waiting, reducing spin power and pipeline cost and
      yielding the core to an SMT sibling (which may be the producer
      linking the next node).
    
    Method signatures are unchanged, so there is no binary-compatibility
    impact.
    
    Result:
    With the fix, the previously hanging
    `HubSpec "work with long streams if one of the producers is slower"`
    completes in ~2.7s (was stuck for the full timeout) and the full
    stream-tests run proceeds past the point where it previously hung
    (1800+ tests, no hang) under the same nightly virtualize=on +
    timefactor=4 JVM options on JDK 25.
    
    References: https://github.com/apache/pekko/issues/2870
    
    * fix: restore acquire/volatile semantics for AbstractBoundedNodeQueue
    
    Motivation:
    AbstractBoundedNodeQueue is the bounded sibling of AbstractNodeQueue
    and was downgraded by the same VarHandle migration (#1990): the
    consumer reads `Node.next()`, `getEnq()` and `getDeq()` became plain
    `VarHandle.get` loads, where the Unsafe original used
    `getObjectVolatile`. A plain read carries no ordering even on a
    volatile field, so it establishes no happens-before with the
    producer's `setNext` (release) / `casEnq` publication and may be
    hoisted out of the `for(;;)` spin loops in `peekNode`/`pollNode`,
    producing an unbounded spin that never observes the linked node — the
    same failure mode that hangs AbstractNodeQueue on JDK 25 virtual-thread
    carriers.
    
    Modification:
    - `Node.next()` now uses `getAcquire`, pairing with the `setRelease`
      write.
    - `getEnq()`/`getDeq()` now use `getVolatile`, restoring the original
      `getObjectVolatile` semantics for these CAS-published fields.
    - Added `Thread.onSpinWait()` to the `peekNode` busy-wait.
    
    For a load, `getAcquire`/`getVolatile` compile to the same instruction
    as a volatile load on x86-64 (MOV) and AArch64 (LDAR), so this restores
    the pre-#1990 memory semantics at no additional cost. Method signatures
    are unchanged, so there is no binary-compatibility impact.
    
    Result:
    The bounded queue can no longer spin forever on a node that a producer
    has appended but whose `next` link is not yet visible to the consumer.
    
    References: https://github.com/apache/pekko/issues/2870
---
 .../pekko/dispatch/AbstractBoundedNodeQueue.java   | 19 +++++++++++++---
 .../apache/pekko/dispatch/AbstractNodeQueue.java   | 26 ++++++++++++++++------
 2 files changed, 35 insertions(+), 10 deletions(-)

diff --git 
a/actor/src/main/java/org/apache/pekko/dispatch/AbstractBoundedNodeQueue.java 
b/actor/src/main/java/org/apache/pekko/dispatch/AbstractBoundedNodeQueue.java
index 361fa052e4..62f1ebdd13 100644
--- 
a/actor/src/main/java/org/apache/pekko/dispatch/AbstractBoundedNodeQueue.java
+++ 
b/actor/src/main/java/org/apache/pekko/dispatch/AbstractBoundedNodeQueue.java
@@ -50,7 +50,10 @@ public abstract class AbstractBoundedNodeQueue<T> {
     }
 
     private Node<T> getEnq() {
-        return (Node<T>) enqHandle.get(this);
+        // Volatile read: this field is published across threads via casEnq; a 
plain read
+        // (VarHandle.get) carries no ordering even on a volatile field and 
may observe a
+        // stale node or be hoisted out of the spin loops in peekNode/pollNode.
+        return (Node<T>) enqHandle.getVolatile(this);
     }
 
     private boolean casEnq(Node<T> old, Node<T> nju) {
@@ -62,7 +65,8 @@ public abstract class AbstractBoundedNodeQueue<T> {
     }
 
     private Node<T> getDeq() {
-        return (Node<T>) deqHandle.get(this);
+        // Volatile read: see getEnq. Published across threads via casDeq.
+        return (Node<T>) deqHandle.getVolatile(this);
     }
 
     private boolean casDeq(Node<T> old, Node<T> nju) {
@@ -75,6 +79,9 @@ public abstract class AbstractBoundedNodeQueue<T> {
           final Node<T> next = deq.next();
           if (next != null || getEnq() == deq)
             return next;
+          // spinning until the producer that already advanced enq finishes 
linking next;
+          // onSpinWait reduces busy-wait power/pipeline cost and yields to an 
SMT sibling.
+          Thread.onSpinWait();
         }
     }
 
@@ -204,7 +211,13 @@ public abstract class AbstractBoundedNodeQueue<T> {
 
         @SuppressWarnings("unchecked")
         public final Node<T> next() {
-            return (Node<T>) nextHandle.get(this);
+            // Acquire load to pair with the release store in setNext: it 
establishes the
+            // happens-before that publishes the node and prevents the read 
from being hoisted
+            // out of the spin loops in peekNode/pollNode (a plain 
VarHandle.get carries no
+            // ordering even on a volatile field and may be hoisted, yielding 
an endless spin).
+            // getAcquire is sufficient here (release/acquire pairing) and is 
no costlier than
+            // getVolatile for a load, so the stronger sequential consistency 
is not needed.
+            return (Node<T>) nextHandle.getAcquire(this);
         }
 
         protected final void setNext(final Node<T> newNext) {
diff --git 
a/actor/src/main/java/org/apache/pekko/dispatch/AbstractNodeQueue.java 
b/actor/src/main/java/org/apache/pekko/dispatch/AbstractNodeQueue.java
index ac8921e155..61d95204bd 100644
--- a/actor/src/main/java/org/apache/pekko/dispatch/AbstractNodeQueue.java
+++ b/actor/src/main/java/org/apache/pekko/dispatch/AbstractNodeQueue.java
@@ -54,12 +54,15 @@ public abstract class AbstractNodeQueue<T> extends 
AtomicReference<AbstractNodeQ
      */
     @SuppressWarnings("unchecked")
     protected final Node<T> peekNode() {
-        final Node<T> tail = (Node<T>) tailHandle.get(this);
+        final Node<T> tail = (Node<T>) tailHandle.getAcquire(this);
         Node<T> next = tail.next();
         if (next == null && get() != tail) {
             // if tail != head this is not going to change until producer 
makes progress
-            // we can avoid reading the head and just spin on next until it 
shows up
+            // we can avoid reading the head and just spin on next until it 
shows up.
+            // onSpinWait hints the CPU we are busy-waiting: it cuts spin 
power/pipeline cost
+            // and yields the core to an SMT sibling (which may be the 
producer linking next).
             do {
+                Thread.onSpinWait();
                 next = tail.next();
             } while (next == null);
         }
@@ -110,7 +113,7 @@ public abstract class AbstractNodeQueue<T> extends 
AtomicReference<AbstractNodeQ
      * @return true if queue was empty at some point in the past
      */
     public final boolean isEmpty() {
-        return tailHandle.get(this) == get();
+        return tailHandle.getAcquire(this) == get();
     }
 
     /**
@@ -126,7 +129,7 @@ public abstract class AbstractNodeQueue<T> extends 
AtomicReference<AbstractNodeQ
     public final int count() {
         int count = 0;
         final Node<T> head = get();
-        for(Node<T> n = ((Node<T>) tailHandle.get(this)).next();
+        for(Node<T> n = ((Node<T>) tailHandle.getAcquire(this)).next();
             n != null && count < Integer.MAX_VALUE; 
             n = n.next()) {
           ++count;
@@ -162,12 +165,15 @@ public abstract class AbstractNodeQueue<T> extends 
AtomicReference<AbstractNodeQ
      */
     @SuppressWarnings("unchecked")
     public final Node<T> pollNode() {
-      final Node<T> tail = (Node<T>) tailHandle.get(this);
+      final Node<T> tail = (Node<T>) tailHandle.getAcquire(this);
       Node<T> next = tail.next();
       if (next == null && get() != tail) {
           // if tail != head this is not going to change until producer makes 
progress
-          // we can avoid reading the head and just spin on next until it 
shows up
+          // we can avoid reading the head and just spin on next until it 
shows up.
+          // onSpinWait hints the CPU we are busy-waiting: it cuts spin 
power/pipeline cost
+          // and yields the core to an SMT sibling (which may be the producer 
linking next).
           do {
+              Thread.onSpinWait();
               next = tail.next();
           } while (next == null);
       }
@@ -208,7 +214,13 @@ public abstract class AbstractNodeQueue<T> extends 
AtomicReference<AbstractNodeQ
         }
 
         public final Node<T> next() {
-            return (Node<T>) nextHandle.get(this);
+            // Acquire load to pair with the release store in setNext: it 
establishes the
+            // happens-before that publishes the node and prevents the read 
from being hoisted
+            // out of the spin loops in peekNode/pollNode (a plain 
VarHandle.get carries no
+            // ordering even on a volatile field and may be hoisted, yielding 
an endless spin).
+            // getAcquire is sufficient here (release/acquire pairing) and is 
no costlier than
+            // getVolatile for a load, so the stronger sequential consistency 
is not needed.
+            return (Node<T>) nextHandle.getAcquire(this);
         }
 
         protected final void setNext(final Node<T> newNext) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to