This is an automated email from the ASF dual-hosted git repository.
He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 477149eefc test: await async alsoTo/wireTap side sink in context specs
(#3006)
477149eefc is described below
commit 477149eefcc4df10ac9e8172484e9b769b113c5f
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Fri May 29 17:22:19 2026 +0800
test: await async alsoTo/wireTap side sink in context specs (#3006)
Motivation:
SourceWithContextSpec and FlowWithContextSpec fail on JDK 25 nightly
with a real assertion error (not a timeout), e.g.:
Vector(Message(A,1), Message(B,2), Message(D,3)) was not equal to
Vector(Message(A,1), Message(B,2), Message(D,3), Message(C,4))
(SourceWithContextSpec.scala:94) *** FAILED *** (15 milliseconds)
`alsoTo`/`alsoToContext`/`wireTap`/`wireTapContext` feed an
asynchronous side Sink. The tests collected the side stream output
into a buffer and asserted on it immediately after the main stream's
`expectComplete()`. There is no happens-before between the main
stream completing and the side Sink draining its last element, so the
assertion can run before the side Sink has observed every element.
The enclosing `within(10.seconds) { ... }` only bounds how long the
block may take; it does not retry, so the buffer was read exactly
once. On JDK 17 the side Sink usually drained first by luck; JDK 25's
different scheduling surfaces the race as a hard failure in ~15ms.
The shared `scala.collection.mutable.ListBuffer` was also written from
the side-stream dispatcher thread and read from the test thread
without synchronization, a second latent data race.
Modification:
- Collect side-stream output into a `java.util.concurrent.
ConcurrentLinkedQueue` (thread-safe, preserves arrival order).
- Replace the single `within(10.seconds) { assert }` with
`awaitAssert(assert, 10.seconds)`, which polls until the async side
Sink has caught up (or the deadline elapses).
Result:
The assertions now wait for the asynchronous side Sink to finish
instead of sampling it once, removing both the timing race and the
cross-thread buffer race. Both specs (24 tests) pass locally; the
checks still fail fast if the side Sink genuinely drops data, since
awaitAssert rethrows the last assertion error at the deadline.
---
.../stream/scaladsl/FlowWithContextSpec.scala | 39 ++++++++++------------
.../stream/scaladsl/SourceWithContextSpec.scala | 39 ++++++++++------------
2 files changed, 36 insertions(+), 42 deletions(-)
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala
index da9e22f9b9..fd27c27238 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala
@@ -13,8 +13,10 @@
package org.apache.pekko.stream.scaladsl
-import scala.collection.mutable.ListBuffer
+import java.util.concurrent.ConcurrentLinkedQueue
+
import scala.concurrent.duration._
+import scala.jdk.CollectionConverters._
import scala.util.control.NoStackTrace
import org.apache.pekko
@@ -80,14 +82,17 @@ class FlowWithContextSpec extends StreamSpec {
}
"pass through all data when using alsoTo" in {
- val listBuffer = new ListBuffer[String]()
+ // alsoTo feeds an asynchronous side Sink, which may still be draining
when the
+ // main stream completes. Poll until it has observed every element
instead of
+ // asserting once (the single assertion raced under JDK 25 scheduling).
+ val received = new ConcurrentLinkedQueue[String]()
Source(Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L),
Message("C", 4L)))
.asSourceWithContext(_.offset)
.via(
FlowWithContext.fromTuples(Flow.fromFunction[(Message, Long),
(String, Long)] { case (data, offset) =>
(data.data.toLowerCase, offset)
})
- .alsoTo(Sink.foreach(string => listBuffer.+=(string)))
+ .alsoTo(Sink.foreach(string => received.add(string)))
)
.toMat(TestSink[(String, Long)]())(Keep.right)
.run()
@@ -97,20 +102,18 @@ class FlowWithContextSpec extends StreamSpec {
.expectNext(("d", 3L))
.expectNext(("c", 4L))
.expectComplete()
- .within(10.seconds) {
- listBuffer should contain theSameElementsInOrderAs List("a", "b",
"d", "c")
- }
+ awaitAssert(received.asScala.toList should contain
theSameElementsInOrderAs List("a", "b", "d", "c"), 10.seconds)
}
"pass through all data when using alsoToContext" in {
- val listBuffer = new ListBuffer[Long]()
+ val received = new ConcurrentLinkedQueue[Long]()
Source(Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L),
Message("C", 4L)))
.asSourceWithContext(_.offset)
.via(
FlowWithContext.fromTuples(Flow.fromFunction[(Message, Long),
(String, Long)] { case (data, offset) =>
(data.data.toLowerCase, offset)
})
- .alsoToContext(Sink.foreach(offset => listBuffer.+=(offset)))
+ .alsoToContext(Sink.foreach(offset => received.add(offset)))
)
.toMat(TestSink[(String, Long)]())(Keep.right)
.run()
@@ -120,19 +123,17 @@ class FlowWithContextSpec extends StreamSpec {
.expectNext(("d", 3L))
.expectNext(("c", 4L))
.expectComplete()
- .within(10.seconds) {
- listBuffer should contain theSameElementsInOrderAs List(1L, 2L, 3L,
4L)
- }
+ awaitAssert(received.asScala.toList should contain
theSameElementsInOrderAs List(1L, 2L, 3L, 4L), 10.seconds)
}
"pass through all data when using wireTap" in {
- val listBuffer = new ListBuffer[String]()
+ val received = new ConcurrentLinkedQueue[String]()
Source(Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L),
Message("C", 4L)))
.asSourceWithContext(_.offset)
.via(
FlowWithContext.fromTuples(Flow.fromFunction[(Message, Long),
(String, Long)] { case (data, offset) =>
(data.data.toLowerCase, offset)
- }).wireTap(Sink.foreach(string => listBuffer.+=(string)))
+ }).wireTap(Sink.foreach(string => received.add(string)))
)
.toMat(TestSink[(String, Long)]())(Keep.right)
.run()
@@ -142,19 +143,17 @@ class FlowWithContextSpec extends StreamSpec {
.expectNext(("d", 3L))
.expectNext(("c", 4L))
.expectComplete()
- .within(10.seconds) {
- listBuffer should contain atLeastOneElementOf List("a", "b", "d",
"c")
- }
+ awaitAssert(received.asScala.toList should contain atLeastOneElementOf
List("a", "b", "d", "c"), 10.seconds)
}
"pass through all data when using wireTapContext" in {
- val listBuffer = new ListBuffer[Long]()
+ val received = new ConcurrentLinkedQueue[Long]()
Source(Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L),
Message("C", 4L)))
.asSourceWithContext(_.offset)
.via(
FlowWithContext.fromTuples(Flow.fromFunction[(Message, Long),
(String, Long)] { case (data, offset) =>
(data.data.toLowerCase, offset)
- }).wireTapContext(Sink.foreach(offset => listBuffer.+=(offset)))
+ }).wireTapContext(Sink.foreach(offset => received.add(offset)))
)
.toMat(TestSink[(String, Long)]())(Keep.right)
.run()
@@ -164,9 +163,7 @@ class FlowWithContextSpec extends StreamSpec {
.expectNext(("d", 3L))
.expectNext(("c", 4L))
.expectComplete()
- .within(10.seconds) {
- listBuffer should contain atLeastOneElementOf List(1L, 2L, 3L, 4L)
- }
+ awaitAssert(received.asScala.toList should contain atLeastOneElementOf
List(1L, 2L, 3L, 4L), 10.seconds)
}
"keep the same order for data and context when using unsafeDataVia" in {
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala
index ff578d4629..02b9c9e535 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala
@@ -13,8 +13,10 @@
package org.apache.pekko.stream.scaladsl
-import scala.collection.mutable.ListBuffer
+import java.util.concurrent.ConcurrentLinkedQueue
+
import scala.concurrent.duration._
+import scala.jdk.CollectionConverters._
import scala.util.control.NoStackTrace
import org.apache.pekko
@@ -77,11 +79,14 @@ class SourceWithContextSpec extends StreamSpec {
}
"pass through all data when using alsoTo" in {
- val listBuffer = new ListBuffer[Message]()
+ // alsoTo feeds an asynchronous side Sink, which may still be draining
when the
+ // main stream completes. Poll until it has observed every element
instead of
+ // asserting once (the single assertion raced under JDK 25 scheduling).
+ val received = new ConcurrentLinkedQueue[Message]()
val messages = Vector(Message("A", 1L), Message("B", 2L), Message("D",
3L), Message("C", 4L))
Source(messages)
.asSourceWithContext(_.offset)
- .alsoTo(Sink.foreach(message => listBuffer.+=(message)))
+ .alsoTo(Sink.foreach(message => received.add(message)))
.toMat(TestSink[(Message, Long)]())(Keep.right)
.run()
.request(4)
@@ -90,17 +95,15 @@ class SourceWithContextSpec extends StreamSpec {
.expectNext((Message("D", 3L), 3L))
.expectNext((Message("C", 4L), 4L))
.expectComplete()
- .within(10.seconds) {
- listBuffer.toVector shouldBe messages
- }
+ awaitAssert(received.asScala.toVector shouldBe messages, 10.seconds)
}
"pass through all data when using alsoToContext" in {
- val listBuffer = new ListBuffer[Long]()
+ val received = new ConcurrentLinkedQueue[Long]()
val messages = Vector(Message("A", 1L), Message("B", 2L), Message("D",
3L), Message("C", 4L))
Source(messages)
.asSourceWithContext(_.offset)
- .alsoToContext(Sink.foreach(offset => listBuffer.+=(offset)))
+ .alsoToContext(Sink.foreach(offset => received.add(offset)))
.toMat(TestSink[(Message, Long)]())(Keep.right)
.run()
.request(4)
@@ -109,17 +112,15 @@ class SourceWithContextSpec extends StreamSpec {
.expectNext((Message("D", 3L), 3L))
.expectNext((Message("C", 4L), 4L))
.expectComplete()
- .within(10.seconds) {
- listBuffer.toVector shouldBe messages.map(_.offset)
- }
+ awaitAssert(received.asScala.toVector shouldBe messages.map(_.offset),
10.seconds)
}
"pass through all data when using wireTap" in {
- val listBuffer = new ListBuffer[Message]()
+ val received = new ConcurrentLinkedQueue[Message]()
val messages = Vector(Message("A", 1L), Message("B", 2L), Message("D",
3L), Message("C", 4L))
Source(messages)
.asSourceWithContext(_.offset)
- .wireTap(Sink.foreach(message => listBuffer.+=(message)))
+ .wireTap(Sink.foreach(message => received.add(message)))
.toMat(TestSink[(Message, Long)]())(Keep.right)
.run()
.request(4)
@@ -128,17 +129,15 @@ class SourceWithContextSpec extends StreamSpec {
.expectNext((Message("D", 3L), 3L))
.expectNext((Message("C", 4L), 4L))
.expectComplete()
- .within(10.seconds) {
- listBuffer.toVector should contain atLeastOneElementOf messages
- }
+ awaitAssert(received.asScala.toVector should contain atLeastOneElementOf
messages, 10.seconds)
}
"pass through all data when using wireTapContext" in {
- val listBuffer = new ListBuffer[Long]()
+ val received = new ConcurrentLinkedQueue[Long]()
val messages = Vector(Message("A", 1L), Message("B", 2L), Message("D",
3L), Message("C", 4L))
Source(messages)
.asSourceWithContext(_.offset)
- .wireTapContext(Sink.foreach(offset => listBuffer.+=(offset)))
+ .wireTapContext(Sink.foreach(offset => received.add(offset)))
.toMat(TestSink[(Message, Long)]())(Keep.right)
.run()
.request(4)
@@ -147,9 +146,7 @@ class SourceWithContextSpec extends StreamSpec {
.expectNext((Message("D", 3L), 3L))
.expectNext((Message("C", 4L), 4L))
.expectComplete()
- .within(10.seconds) {
- listBuffer.toVector should contain atLeastOneElementOf
(messages.map(_.offset))
- }
+ awaitAssert(received.asScala.toVector should contain atLeastOneElementOf
(messages.map(_.offset)), 10.seconds)
}
"pass through contexts via a FlowWithContext" in {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]