This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 477149eefc test: await async alsoTo/wireTap side sink in context specs 
(#3006)
477149eefc is described below

commit 477149eefcc4df10ac9e8172484e9b769b113c5f
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Fri May 29 17:22:19 2026 +0800

    test: await async alsoTo/wireTap side sink in context specs (#3006)
    
    Motivation:
    SourceWithContextSpec and FlowWithContextSpec fail on JDK 25 nightly
    with a real assertion error (not a timeout), e.g.:
    
      Vector(Message(A,1), Message(B,2), Message(D,3)) was not equal to
      Vector(Message(A,1), Message(B,2), Message(D,3), Message(C,4))
      (SourceWithContextSpec.scala:94)  *** FAILED *** (15 milliseconds)
    
    `alsoTo`/`alsoToContext`/`wireTap`/`wireTapContext` feed an
    asynchronous side Sink. The tests collected the side stream output
    into a buffer and asserted on it immediately after the main stream's
    `expectComplete()`. There is no happens-before between the main
    stream completing and the side Sink draining its last element, so the
    assertion can run before the side Sink has observed every element.
    The enclosing `within(10.seconds) { ... }` only bounds how long the
    block may take; it does not retry, so the buffer was read exactly
    once. On JDK 17 the side Sink usually drained first by luck; JDK 25's
    different scheduling surfaces the race as a hard failure in ~15ms.
    
    The shared `scala.collection.mutable.ListBuffer` was also written from
    the side-stream dispatcher thread and read from the test thread
    without synchronization, a second latent data race.
    
    Modification:
    - Collect side-stream output into a `java.util.concurrent.
      ConcurrentLinkedQueue` (thread-safe, preserves arrival order).
    - Replace the single `within(10.seconds) { assert }` with
      `awaitAssert(assert, 10.seconds)`, which polls until the async side
      Sink has caught up (or the deadline elapses).
    
    Result:
    The assertions now wait for the asynchronous side Sink to finish
    instead of sampling it once, removing both the timing race and the
    cross-thread buffer race. Both specs (24 tests) pass locally; the
    checks still fail fast if the side Sink genuinely drops data, since
    awaitAssert rethrows the last assertion error at the deadline.
---
 .../stream/scaladsl/FlowWithContextSpec.scala      | 39 ++++++++++------------
 .../stream/scaladsl/SourceWithContextSpec.scala    | 39 ++++++++++------------
 2 files changed, 36 insertions(+), 42 deletions(-)

diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala
index da9e22f9b9..fd27c27238 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala
@@ -13,8 +13,10 @@
 
 package org.apache.pekko.stream.scaladsl
 
-import scala.collection.mutable.ListBuffer
+import java.util.concurrent.ConcurrentLinkedQueue
+
 import scala.concurrent.duration._
+import scala.jdk.CollectionConverters._
 import scala.util.control.NoStackTrace
 
 import org.apache.pekko
@@ -80,14 +82,17 @@ class FlowWithContextSpec extends StreamSpec {
     }
 
     "pass through all data when using alsoTo" in {
-      val listBuffer = new ListBuffer[String]()
+      // alsoTo feeds an asynchronous side Sink, which may still be draining 
when the
+      // main stream completes. Poll until it has observed every element 
instead of
+      // asserting once (the single assertion raced under JDK 25 scheduling).
+      val received = new ConcurrentLinkedQueue[String]()
       Source(Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), 
Message("C", 4L)))
         .asSourceWithContext(_.offset)
         .via(
           FlowWithContext.fromTuples(Flow.fromFunction[(Message, Long), 
(String, Long)] { case (data, offset) =>
             (data.data.toLowerCase, offset)
           })
-            .alsoTo(Sink.foreach(string => listBuffer.+=(string)))
+            .alsoTo(Sink.foreach(string => received.add(string)))
         )
         .toMat(TestSink[(String, Long)]())(Keep.right)
         .run()
@@ -97,20 +102,18 @@ class FlowWithContextSpec extends StreamSpec {
         .expectNext(("d", 3L))
         .expectNext(("c", 4L))
         .expectComplete()
-        .within(10.seconds) {
-          listBuffer should contain theSameElementsInOrderAs List("a", "b", 
"d", "c")
-        }
+      awaitAssert(received.asScala.toList should contain 
theSameElementsInOrderAs List("a", "b", "d", "c"), 10.seconds)
     }
 
     "pass through all data when using alsoToContext" in {
-      val listBuffer = new ListBuffer[Long]()
+      val received = new ConcurrentLinkedQueue[Long]()
       Source(Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), 
Message("C", 4L)))
         .asSourceWithContext(_.offset)
         .via(
           FlowWithContext.fromTuples(Flow.fromFunction[(Message, Long), 
(String, Long)] { case (data, offset) =>
             (data.data.toLowerCase, offset)
           })
-            .alsoToContext(Sink.foreach(offset => listBuffer.+=(offset)))
+            .alsoToContext(Sink.foreach(offset => received.add(offset)))
         )
         .toMat(TestSink[(String, Long)]())(Keep.right)
         .run()
@@ -120,19 +123,17 @@ class FlowWithContextSpec extends StreamSpec {
         .expectNext(("d", 3L))
         .expectNext(("c", 4L))
         .expectComplete()
-        .within(10.seconds) {
-          listBuffer should contain theSameElementsInOrderAs List(1L, 2L, 3L, 
4L)
-        }
+      awaitAssert(received.asScala.toList should contain 
theSameElementsInOrderAs List(1L, 2L, 3L, 4L), 10.seconds)
     }
 
     "pass through all data when using wireTap" in {
-      val listBuffer = new ListBuffer[String]()
+      val received = new ConcurrentLinkedQueue[String]()
       Source(Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), 
Message("C", 4L)))
         .asSourceWithContext(_.offset)
         .via(
           FlowWithContext.fromTuples(Flow.fromFunction[(Message, Long), 
(String, Long)] { case (data, offset) =>
             (data.data.toLowerCase, offset)
-          }).wireTap(Sink.foreach(string => listBuffer.+=(string)))
+          }).wireTap(Sink.foreach(string => received.add(string)))
         )
         .toMat(TestSink[(String, Long)]())(Keep.right)
         .run()
@@ -142,19 +143,17 @@ class FlowWithContextSpec extends StreamSpec {
         .expectNext(("d", 3L))
         .expectNext(("c", 4L))
         .expectComplete()
-        .within(10.seconds) {
-          listBuffer should contain atLeastOneElementOf List("a", "b", "d", 
"c")
-        }
+      awaitAssert(received.asScala.toList should contain atLeastOneElementOf 
List("a", "b", "d", "c"), 10.seconds)
     }
 
     "pass through all data when using wireTapContext" in {
-      val listBuffer = new ListBuffer[Long]()
+      val received = new ConcurrentLinkedQueue[Long]()
       Source(Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), 
Message("C", 4L)))
         .asSourceWithContext(_.offset)
         .via(
           FlowWithContext.fromTuples(Flow.fromFunction[(Message, Long), 
(String, Long)] { case (data, offset) =>
             (data.data.toLowerCase, offset)
-          }).wireTapContext(Sink.foreach(offset => listBuffer.+=(offset)))
+          }).wireTapContext(Sink.foreach(offset => received.add(offset)))
         )
         .toMat(TestSink[(String, Long)]())(Keep.right)
         .run()
@@ -164,9 +163,7 @@ class FlowWithContextSpec extends StreamSpec {
         .expectNext(("d", 3L))
         .expectNext(("c", 4L))
         .expectComplete()
-        .within(10.seconds) {
-          listBuffer should contain atLeastOneElementOf List(1L, 2L, 3L, 4L)
-        }
+      awaitAssert(received.asScala.toList should contain atLeastOneElementOf 
List(1L, 2L, 3L, 4L), 10.seconds)
     }
 
     "keep the same order for data and context when using unsafeDataVia" in {
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala
index ff578d4629..02b9c9e535 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala
@@ -13,8 +13,10 @@
 
 package org.apache.pekko.stream.scaladsl
 
-import scala.collection.mutable.ListBuffer
+import java.util.concurrent.ConcurrentLinkedQueue
+
 import scala.concurrent.duration._
+import scala.jdk.CollectionConverters._
 import scala.util.control.NoStackTrace
 
 import org.apache.pekko
@@ -77,11 +79,14 @@ class SourceWithContextSpec extends StreamSpec {
     }
 
     "pass through all data when using alsoTo" in {
-      val listBuffer = new ListBuffer[Message]()
+      // alsoTo feeds an asynchronous side Sink, which may still be draining 
when the
+      // main stream completes. Poll until it has observed every element 
instead of
+      // asserting once (the single assertion raced under JDK 25 scheduling).
+      val received = new ConcurrentLinkedQueue[Message]()
       val messages = Vector(Message("A", 1L), Message("B", 2L), Message("D", 
3L), Message("C", 4L))
       Source(messages)
         .asSourceWithContext(_.offset)
-        .alsoTo(Sink.foreach(message => listBuffer.+=(message)))
+        .alsoTo(Sink.foreach(message => received.add(message)))
         .toMat(TestSink[(Message, Long)]())(Keep.right)
         .run()
         .request(4)
@@ -90,17 +95,15 @@ class SourceWithContextSpec extends StreamSpec {
         .expectNext((Message("D", 3L), 3L))
         .expectNext((Message("C", 4L), 4L))
         .expectComplete()
-        .within(10.seconds) {
-          listBuffer.toVector shouldBe messages
-        }
+      awaitAssert(received.asScala.toVector shouldBe messages, 10.seconds)
     }
 
     "pass through all data when using alsoToContext" in {
-      val listBuffer = new ListBuffer[Long]()
+      val received = new ConcurrentLinkedQueue[Long]()
       val messages = Vector(Message("A", 1L), Message("B", 2L), Message("D", 
3L), Message("C", 4L))
       Source(messages)
         .asSourceWithContext(_.offset)
-        .alsoToContext(Sink.foreach(offset => listBuffer.+=(offset)))
+        .alsoToContext(Sink.foreach(offset => received.add(offset)))
         .toMat(TestSink[(Message, Long)]())(Keep.right)
         .run()
         .request(4)
@@ -109,17 +112,15 @@ class SourceWithContextSpec extends StreamSpec {
         .expectNext((Message("D", 3L), 3L))
         .expectNext((Message("C", 4L), 4L))
         .expectComplete()
-        .within(10.seconds) {
-          listBuffer.toVector shouldBe messages.map(_.offset)
-        }
+      awaitAssert(received.asScala.toVector shouldBe messages.map(_.offset), 
10.seconds)
     }
 
     "pass through all data when using wireTap" in {
-      val listBuffer = new ListBuffer[Message]()
+      val received = new ConcurrentLinkedQueue[Message]()
       val messages = Vector(Message("A", 1L), Message("B", 2L), Message("D", 
3L), Message("C", 4L))
       Source(messages)
         .asSourceWithContext(_.offset)
-        .wireTap(Sink.foreach(message => listBuffer.+=(message)))
+        .wireTap(Sink.foreach(message => received.add(message)))
         .toMat(TestSink[(Message, Long)]())(Keep.right)
         .run()
         .request(4)
@@ -128,17 +129,15 @@ class SourceWithContextSpec extends StreamSpec {
         .expectNext((Message("D", 3L), 3L))
         .expectNext((Message("C", 4L), 4L))
         .expectComplete()
-        .within(10.seconds) {
-          listBuffer.toVector should contain atLeastOneElementOf messages
-        }
+      awaitAssert(received.asScala.toVector should contain atLeastOneElementOf 
messages, 10.seconds)
     }
 
     "pass through all data when using wireTapContext" in {
-      val listBuffer = new ListBuffer[Long]()
+      val received = new ConcurrentLinkedQueue[Long]()
       val messages = Vector(Message("A", 1L), Message("B", 2L), Message("D", 
3L), Message("C", 4L))
       Source(messages)
         .asSourceWithContext(_.offset)
-        .wireTapContext(Sink.foreach(offset => listBuffer.+=(offset)))
+        .wireTapContext(Sink.foreach(offset => received.add(offset)))
         .toMat(TestSink[(Message, Long)]())(Keep.right)
         .run()
         .request(4)
@@ -147,9 +146,7 @@ class SourceWithContextSpec extends StreamSpec {
         .expectNext((Message("D", 3L), 3L))
         .expectNext((Message("C", 4L), 4L))
         .expectComplete()
-        .within(10.seconds) {
-          listBuffer.toVector should contain atLeastOneElementOf 
(messages.map(_.offset))
-        }
+      awaitAssert(received.asScala.toVector should contain atLeastOneElementOf 
(messages.map(_.offset)), 10.seconds)
     }
 
     "pass through contexts via a FlowWithContext" in {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to