This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new fc3561b1d9 test: avoid busy-spin in stream stress tests (#2996)
fc3561b1d9 is described below

commit fc3561b1d9e0c54e2f4a815fe79dcfad546c17f2
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Wed May 27 19:50:04 2026 +0800

    test: avoid busy-spin in stream stress tests (#2996)
    
    * test: reduce stream stress test timing sensitivity
    
    Motivation:
    Recent JDK 25 nightly builds repeatedly time out in BoundedSourceQueueSpec 
and FlowMapAsync/FlowMapAsyncUnordered parallelism tests.
    
    Modification:
    Give the BoundedSourceQueue completion assertion a dilated timeout, reduce 
the burst wakeup stress loop to a still-repetitive 10000 rounds, and reduce the 
mapAsync parallelism stress input size while preserving the parallelism limit 
assertion. Also use the same 12 second timeout for the ordered mapAsync test as 
the unordered test gets from the JDK 25 timefactor.
    
    Result:
    The tests continue to verify queue wakeups and mapAsync parallelism limits 
without depending on excessive runner throughput under JDK 25 nightly load.
    
    Tests:
    - scalafmt --mode diff-ref=origin/main
    - scalafmt --list --mode diff-ref=origin/main
    - git diff --check
    - sbt with JDK 25 nightly-style virtualized test-stream-dispatcher flags: 
stream-tests / Test / testOnly 
org.apache.pekko.stream.scaladsl.BoundedSourceQueueSpec 
org.apache.pekko.stream.scaladsl.FlowMapAsyncSpec 
org.apache.pekko.stream.scaladsl.FlowMapAsyncUnorderedSpec
    
    References:
    None - nightly-builds.yml failure analysis
    
    * test: avoid busy-spin in mapAsync stress tests
    
    Motivation:
    Recent nightly stream stress failures timed out under JDK 25 contention. 
The previous PR revision reduced the stress sizes, but that weakened the tests 
instead of addressing the root timing sensitivity.
    
    Modification:
    Keep the existing 10K mapAsync stress inputs and 100K BoundedSourceQueue 
burst loop. Replace the mapAsync stress-test busy-spin delay loops with 
LockSupport.parkNanos so the tests preserve asynchronous completion pressure 
without burning carrier CPU. Keep the BoundedSourceQueue completion assertion 
dilated because that assertion is explicitly timefactor-sensitive.
    
    Result:
    The tests retain their original stress coverage while avoiding artificial 
CPU starvation in nightly runners.
    
    Tests:
    - JDK 25 nightly-style virtualized flags: stream-tests / Test / testOnly 
org.apache.pekko.stream.scaladsl.FlowMapAsyncSpec 
org.apache.pekko.stream.scaladsl.FlowMapAsyncUnorderedSpec 
org.apache.pekko.stream.scaladsl.FlowMapAsyncPartitionedSpec
    - JDK 25 nightly-style virtualized flags: stream-tests / Test / testOnly 
org.apache.pekko.stream.scaladsl.BoundedSourceQueueSpec
    - scalafmt --mode diff-ref=origin/main
    - scalafmt --list --mode diff-ref=origin/main
    - git diff --check
    
    References:
    None - nightly-builds.yml failure analysis.
---
 .../pekko/stream/scaladsl/BoundedSourceQueueSpec.scala      |  4 +++-
 .../pekko/stream/scaladsl/FlowMapAsyncPartitionedSpec.scala | 11 ++++++++++-
 .../org/apache/pekko/stream/scaladsl/FlowMapAsyncSpec.scala | 13 +++++++++++--
 .../pekko/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala   | 10 +++++++++-
 4 files changed, 33 insertions(+), 5 deletions(-)

diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/BoundedSourceQueueSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/BoundedSourceQueueSpec.scala
index cb858c897e..926800035f 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/BoundedSourceQueueSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/BoundedSourceQueueSpec.scala
@@ -25,6 +25,8 @@ import pekko.stream.testkit.scaladsl.TestSink
 import pekko.testkit.TestDuration
 import pekko.testkit.WithLogCapturing
 
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
+
 class BoundedSourceQueueSpec extends StreamSpec("""pekko.loglevel = debug
     |pekko.loggers = ["org.apache.pekko.testkit.SilenceAllTestEventListener"]
     |""".stripMargin) with WithLogCapturing {
@@ -189,7 +191,7 @@ class BoundedSourceQueueSpec extends 
StreamSpec("""pekko.loglevel = debug
       queue.complete()
       queue.isCompleted shouldBe true
 
-      result.futureValue should be(counter.get())
+      result.futureValue(Timeout(15.seconds.dilated)) should be(counter.get())
     }
 
     // copied from pekko-remote SendQueueSpec
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncPartitionedSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncPartitionedSpec.scala
index 5d563cf5e8..b8d9dd6d83 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncPartitionedSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncPartitionedSpec.scala
@@ -15,6 +15,7 @@ package org.apache.pekko.stream.scaladsl
 
 import java.util.concurrent.LinkedBlockingQueue
 import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.locks.LockSupport
 
 import scala.collection.immutable
 import scala.concurrent.Await
@@ -462,7 +463,15 @@ class FlowMapAsyncPartitionedSpec extends StreamSpec with 
WithLogCapturing {
             try {
               val (partition, promise, enqueued) = queue.take()
               val wakeup = enqueued + delay()
-              while (System.nanoTime() < wakeup) {}
+              @annotation.tailrec
+              def waitUntilWakeup(): Unit = {
+                val remaining = wakeup - System.nanoTime()
+                if (remaining > 0) {
+                  LockSupport.parkNanos(remaining)
+                  waitUntilWakeup()
+                }
+              }
+              waitUntilWakeup()
               globalCounter.decrementAndGet()
               partitionCounters(partition).decrementAndGet()
               promise.success(count)
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncSpec.scala
index 9c7759bf7c..281b0c3560 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncSpec.scala
@@ -16,6 +16,7 @@ package org.apache.pekko.stream.scaladsl
 import java.util.concurrent.LinkedBlockingQueue
 import java.util.concurrent.ThreadLocalRandom
 import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.locks.LockSupport
 
 import scala.annotation.tailrec
 import scala.concurrent.Await
@@ -32,6 +33,7 @@ import pekko.stream.Supervision.resumingDecider
 import pekko.stream.testkit._
 import pekko.stream.testkit.Utils._
 import pekko.stream.testkit.scaladsl.TestSink
+import pekko.testkit.TestDuration
 import pekko.testkit.TestLatch
 import pekko.testkit.TestProbe
 
@@ -449,7 +451,14 @@ class FlowMapAsyncSpec extends StreamSpec {
             try {
               val (promise, enqueued) = queue.take()
               val wakeup = enqueued + delay
-              while (System.nanoTime() < wakeup) {}
+              @tailrec def waitUntilWakeup(): Unit = {
+                val remaining = wakeup - System.nanoTime()
+                if (remaining > 0) {
+                  LockSupport.parkNanos(remaining)
+                  waitUntilWakeup()
+                }
+              }
+              waitUntilWakeup()
               counter.decrementAndGet()
               promise.success(count)
               count += 1
@@ -476,7 +485,7 @@ class FlowMapAsyncSpec extends StreamSpec {
         Source(1 to N)
           .mapAsync(parallelism)(_ => deferred())
           .runFold(0)((c, _) => c + 1)
-          .futureValue(Timeout(3.seconds)) should ===(N)
+          .futureValue(Timeout(3.seconds.dilated)) should ===(N)
       } finally {
         timer.interrupt()
       }
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala
index d98f5fd79e..21d0a4a566 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala
@@ -15,6 +15,7 @@ package org.apache.pekko.stream.scaladsl
 
 import java.util.concurrent.LinkedBlockingQueue
 import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.locks.LockSupport
 
 import scala.annotation.tailrec
 import scala.concurrent.Await
@@ -335,7 +336,14 @@ class FlowMapAsyncUnorderedSpec extends StreamSpec {
             try {
               val (promise, enqueued) = queue.take()
               val wakeup = enqueued + delay
-              while (System.nanoTime() < wakeup) {}
+              @tailrec def waitUntilWakeup(): Unit = {
+                val remaining = wakeup - System.nanoTime()
+                if (remaining > 0) {
+                  LockSupport.parkNanos(remaining)
+                  waitUntilWakeup()
+                }
+              }
+              waitUntilWakeup()
               counter.decrementAndGet()
               promise.success(count)
               count += 1


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to