This is an automated email from the ASF dual-hosted git repository.
He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new fc3561b1d9 test: avoid busy-spin in stream stress tests (#2996)
fc3561b1d9 is described below
commit fc3561b1d9e0c54e2f4a815fe79dcfad546c17f2
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Wed May 27 19:50:04 2026 +0800
test: avoid busy-spin in stream stress tests (#2996)
* test: reduce stream stress test timing sensitivity
Motivation:
Recent JDK 25 nightly builds repeatedly time out in BoundedSourceQueueSpec
and FlowMapAsync/FlowMapAsyncUnordered parallelism tests.
Modification:
Give the BoundedSourceQueue completion assertion a dilated timeout, reduce
the burst wakeup stress loop to a still-repetitive 10000 rounds, and reduce the
mapAsync parallelism stress input size while preserving the parallelism limit
assertion. Also use the same 12 second timeout for the ordered mapAsync test as
the unordered test gets from the JDK 25 timefactor.
Result:
The tests continue to verify queue wakeups and mapAsync parallelism limits
without depending on excessive runner throughput under JDK 25 nightly load.
Tests:
- scalafmt --mode diff-ref=origin/main
- scalafmt --list --mode diff-ref=origin/main
- git diff --check
- sbt with JDK 25 nightly-style virtualized test-stream-dispatcher flags:
stream-tests / Test / testOnly
org.apache.pekko.stream.scaladsl.BoundedSourceQueueSpec
org.apache.pekko.stream.scaladsl.FlowMapAsyncSpec
org.apache.pekko.stream.scaladsl.FlowMapAsyncUnorderedSpec
References:
None - nightly-builds.yml failure analysis
* test: avoid busy-spin in mapAsync stress tests
Motivation:
Recent nightly stream stress failures timed out under JDK 25 contention.
The previous PR revision reduced the stress sizes, but that weakened the tests
instead of addressing the root timing sensitivity.
Modification:
Keep the existing 10K mapAsync stress inputs and 100K BoundedSourceQueue
burst loop. Replace the mapAsync stress-test busy-spin delay loops with
LockSupport.parkNanos so the tests preserve asynchronous completion pressure
without burning carrier CPU. Keep the BoundedSourceQueue completion assertion
dilated because that assertion is explicitly timefactor-sensitive.
Result:
The tests retain their original stress coverage while avoiding artificial
CPU starvation in nightly runners.
Tests:
- JDK 25 nightly-style virtualized flags: stream-tests / Test / testOnly
org.apache.pekko.stream.scaladsl.FlowMapAsyncSpec
org.apache.pekko.stream.scaladsl.FlowMapAsyncUnorderedSpec
org.apache.pekko.stream.scaladsl.FlowMapAsyncPartitionedSpec
- JDK 25 nightly-style virtualized flags: stream-tests / Test / testOnly
org.apache.pekko.stream.scaladsl.BoundedSourceQueueSpec
- scalafmt --mode diff-ref=origin/main
- scalafmt --list --mode diff-ref=origin/main
- git diff --check
References:
None - nightly-builds.yml failure analysis.
---
.../pekko/stream/scaladsl/BoundedSourceQueueSpec.scala | 4 +++-
.../pekko/stream/scaladsl/FlowMapAsyncPartitionedSpec.scala | 11 ++++++++++-
.../org/apache/pekko/stream/scaladsl/FlowMapAsyncSpec.scala | 13 +++++++++++--
.../pekko/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala | 10 +++++++++-
4 files changed, 33 insertions(+), 5 deletions(-)
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/BoundedSourceQueueSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/BoundedSourceQueueSpec.scala
index cb858c897e..926800035f 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/BoundedSourceQueueSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/BoundedSourceQueueSpec.scala
@@ -25,6 +25,8 @@ import pekko.stream.testkit.scaladsl.TestSink
import pekko.testkit.TestDuration
import pekko.testkit.WithLogCapturing
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
+
class BoundedSourceQueueSpec extends StreamSpec("""pekko.loglevel = debug
|pekko.loggers = ["org.apache.pekko.testkit.SilenceAllTestEventListener"]
|""".stripMargin) with WithLogCapturing {
@@ -189,7 +191,7 @@ class BoundedSourceQueueSpec extends
StreamSpec("""pekko.loglevel = debug
queue.complete()
queue.isCompleted shouldBe true
- result.futureValue should be(counter.get())
+ result.futureValue(Timeout(15.seconds.dilated)) should be(counter.get())
}
// copied from pekko-remote SendQueueSpec
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncPartitionedSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncPartitionedSpec.scala
index 5d563cf5e8..b8d9dd6d83 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncPartitionedSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncPartitionedSpec.scala
@@ -15,6 +15,7 @@ package org.apache.pekko.stream.scaladsl
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.locks.LockSupport
import scala.collection.immutable
import scala.concurrent.Await
@@ -462,7 +463,15 @@ class FlowMapAsyncPartitionedSpec extends StreamSpec with
WithLogCapturing {
try {
val (partition, promise, enqueued) = queue.take()
val wakeup = enqueued + delay()
- while (System.nanoTime() < wakeup) {}
+ @annotation.tailrec
+ def waitUntilWakeup(): Unit = {
+ val remaining = wakeup - System.nanoTime()
+ if (remaining > 0) {
+ LockSupport.parkNanos(remaining)
+ waitUntilWakeup()
+ }
+ }
+ waitUntilWakeup()
globalCounter.decrementAndGet()
partitionCounters(partition).decrementAndGet()
promise.success(count)
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncSpec.scala
index 9c7759bf7c..281b0c3560 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncSpec.scala
@@ -16,6 +16,7 @@ package org.apache.pekko.stream.scaladsl
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadLocalRandom
import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.locks.LockSupport
import scala.annotation.tailrec
import scala.concurrent.Await
@@ -32,6 +33,7 @@ import pekko.stream.Supervision.resumingDecider
import pekko.stream.testkit._
import pekko.stream.testkit.Utils._
import pekko.stream.testkit.scaladsl.TestSink
+import pekko.testkit.TestDuration
import pekko.testkit.TestLatch
import pekko.testkit.TestProbe
@@ -449,7 +451,14 @@ class FlowMapAsyncSpec extends StreamSpec {
try {
val (promise, enqueued) = queue.take()
val wakeup = enqueued + delay
- while (System.nanoTime() < wakeup) {}
+ @tailrec def waitUntilWakeup(): Unit = {
+ val remaining = wakeup - System.nanoTime()
+ if (remaining > 0) {
+ LockSupport.parkNanos(remaining)
+ waitUntilWakeup()
+ }
+ }
+ waitUntilWakeup()
counter.decrementAndGet()
promise.success(count)
count += 1
@@ -476,7 +485,7 @@ class FlowMapAsyncSpec extends StreamSpec {
Source(1 to N)
.mapAsync(parallelism)(_ => deferred())
.runFold(0)((c, _) => c + 1)
- .futureValue(Timeout(3.seconds)) should ===(N)
+ .futureValue(Timeout(3.seconds.dilated)) should ===(N)
} finally {
timer.interrupt()
}
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala
index d98f5fd79e..21d0a4a566 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala
@@ -15,6 +15,7 @@ package org.apache.pekko.stream.scaladsl
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.locks.LockSupport
import scala.annotation.tailrec
import scala.concurrent.Await
@@ -335,7 +336,14 @@ class FlowMapAsyncUnorderedSpec extends StreamSpec {
try {
val (promise, enqueued) = queue.take()
val wakeup = enqueued + delay
- while (System.nanoTime() < wakeup) {}
+ @tailrec def waitUntilWakeup(): Unit = {
+ val remaining = wakeup - System.nanoTime()
+ if (remaining > 0) {
+ LockSupport.parkNanos(remaining)
+ waitUntilWakeup()
+ }
+ }
+ waitUntilWakeup()
counter.decrementAndGet()
promise.success(count)
count += 1
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]