This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 5addd96f70 Fix stream IO dispatcher test failures (#2806)
5addd96f70 is described below

commit 5addd96f70c22565af4db28f3c6909a3c67d8890
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sat Mar 28 19:38:49 2026 +0800

    Fix stream IO dispatcher test failures (#2806)
    
    Avoid creating custom materializers in tests that verify dispatcher
    assignment. Custom materializers are not covered by StreamSpec's
    afterEach cleanup, causing test pollution and failures.
    
    Changes:
    - Remove custom ActorMaterializer creation in FileSinkSpec,
      FileSourceSpec, and UnfoldResourceAsyncSourceSpec
    - Use SystemMaterializer(system).materializer to access the system
      materializer supervisor for dispatcher assertions
    - Remove separate ActorSystem creation in FileSourceSpec dispatcher
      tests (no longer needed with system materializer)
    - Clean up StreamSpec debug dump: replace non-functional stream
      supervisor query with printDebugDump helper, add stopAllChildren
      on failure for cleanup
    
    Upstream: akka/akka-core@145319d86d
    Cherry-picked from akka/akka-core v2.8.0, which is now Apache licensed.
---
 .../apache/pekko/stream/testkit/StreamSpec.scala   | 42 ++++++----------
 .../org/apache/pekko/stream/io/FileSinkSpec.scala  |  7 ++-
 .../apache/pekko/stream/io/FileSourceSpec.scala    | 56 +++++++++-------------
 .../scaladsl/UnfoldResourceAsyncSourceSpec.scala   |  7 ++-
 4 files changed, 43 insertions(+), 69 deletions(-)

diff --git 
a/stream-testkit/src/test/scala/org/apache/pekko/stream/testkit/StreamSpec.scala
 
b/stream-testkit/src/test/scala/org/apache/pekko/stream/testkit/StreamSpec.scala
index 6032e57c8f..43c458f52d 100644
--- 
a/stream-testkit/src/test/scala/org/apache/pekko/stream/testkit/StreamSpec.scala
+++ 
b/stream-testkit/src/test/scala/org/apache/pekko/stream/testkit/StreamSpec.scala
@@ -15,17 +15,14 @@ package org.apache.pekko.stream.testkit
 
 import java.util.concurrent.TimeUnit
 
-import scala.concurrent.Future
-import scala.concurrent.duration._
+import scala.concurrent.duration.FiniteDuration
 
 import org.apache.pekko
-import pekko.actor.{ ActorRef, ActorSystem }
+import pekko.actor.ActorSystem
 import pekko.stream.Materializer
 import pekko.stream.impl.PhasedFusingActorMaterializer
-import pekko.stream.impl.StreamSupervisor
-import pekko.stream.snapshot.{ MaterializerState, StreamSnapshotImpl }
-import pekko.stream.testkit.scaladsl.StreamTestKit.{ assertNoChildren, 
stopAllChildren }
-import pekko.testkit.{ PekkoSpec, TestProbe }
+import pekko.stream.testkit.scaladsl.StreamTestKit.{ assertNoChildren, 
printDebugDump, stopAllChildren }
+import pekko.testkit.PekkoSpec
 import pekko.testkit.TestKitUtils
 
 import org.scalatest.Failed
@@ -49,31 +46,22 @@ abstract class StreamSpec(_system: ActorSystem) extends 
PekkoSpec(_system) with
   override def withFixture(test: NoArgTest) = {
     super.withFixture(test) match {
       case failed: Failed =>
-        implicit val ec = system.dispatcher
-        val probe = TestProbe()(system)
-        // FIXME I don't think it always runs under /user anymore (typed)
-        // FIXME correction - I'm not sure this works at _all_ - supposed to 
dump stream state if test fails
-        val streamSupervisors = system.actorSelection("/user/" + 
StreamSupervisor.baseName + "*")
-        streamSupervisors.tell(StreamSupervisor.GetChildren, probe.ref)
-        val children: Seq[ActorRef] = probe
-          .receiveWhile(2.seconds) {
-            case StreamSupervisor.Children(children) => children
-          }
-          .flatten
-        println("--- Stream actors debug dump ---")
-        if (children.isEmpty) println("Stream is completed. No debug 
information is available")
-        else {
-          println("Stream actors alive: " + children)
-          Future
-            .sequence(children.map(MaterializerState.requestFromChild))
-            .foreach(snapshots =>
-              snapshots.foreach(s =>
-                
pekko.stream.testkit.scaladsl.StreamTestKit.snapshotString(s.asInstanceOf[StreamSnapshotImpl])))
+        Materializer(_system) match {
+          case impl: PhasedFusingActorMaterializer =>
+            implicit val ec = impl.system.dispatcher
+            println("--- Stream actors debug dump (only works for tests using 
system materializer) ---")
+            printDebugDump(impl.supervisor)
+            println("--- Stream actors debug dump end ---")
+            stopAllChildren(impl.system, impl.supervisor)
+          case _ =>
         }
         failed
       case other =>
         Materializer(_system) match {
           case impl: PhasedFusingActorMaterializer =>
+            // Note that this is different from assertAllStages stopped since 
it tries to
+            // *kill* all streams first, before checking if any is stuck. It 
also does not
+            // work for tests starting their own materializers.
             stopAllChildren(impl.system, impl.supervisor)
             val result = test.apply()
             assertNoChildren(impl.system, impl.supervisor,
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSinkSpec.scala 
b/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSinkSpec.scala
index 8dfd06209c..30e208af68 100644
--- a/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSinkSpec.scala
+++ b/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSinkSpec.scala
@@ -32,14 +32,13 @@ import pekko.stream.scaladsl.{ FileIO, Keep, Source }
 import pekko.stream.testkit._
 import pekko.stream.testkit.Utils._
 import pekko.util.ByteString
+import pekko.stream.SystemMaterializer
 
 import org.scalatest.concurrent.ScalaFutures
 
 @nowarn
 class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) with 
ScalaFutures {
 
-  val settings = 
ActorMaterializerSettings(system).withDispatcher("pekko.actor.default-dispatcher")
-  implicit val materializer: Materializer = ActorMaterializer(settings)
   val fs = Jimfs.newFileSystem("FileSinkSpec", Configuration.unix())
 
   val TestLines = {
@@ -171,7 +170,7 @@ class FileSinkSpec extends 
StreamSpec(UnboundedMailboxConfig) with ScalaFutures
       targetFile { f =>
         val forever = Source.maybe.toMat(FileIO.toPath(f))(Keep.left).run()
         try {
-          materializer
+          SystemMaterializer(system).materializer
             .asInstanceOf[PhasedFusingActorMaterializer]
             .supervisor
             .tell(StreamSupervisor.GetChildren, testActor)
@@ -195,7 +194,7 @@ class FileSinkSpec extends 
StreamSpec(UnboundedMailboxConfig) with ScalaFutures
             Keep.left)
           .run()
         try {
-          materializer
+          SystemMaterializer(system).materializer
             .asInstanceOf[PhasedFusingActorMaterializer]
             .supervisor
             .tell(StreamSupervisor.GetChildren, testActor)
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSourceSpec.scala 
b/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSourceSpec.scala
index 08724f9b0a..8f6a417568 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSourceSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSourceSpec.scala
@@ -24,7 +24,6 @@ import scala.concurrent.duration._
 import com.google.common.jimfs.{ Configuration, Jimfs }
 
 import org.apache.pekko
-import pekko.actor.ActorSystem
 import pekko.stream._
 import pekko.stream.IOResult._
 import pekko.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor }
@@ -35,6 +34,7 @@ import pekko.stream.testkit._
 import pekko.stream.testkit.Utils._
 import pekko.stream.testkit.scaladsl.TestSink
 import pekko.util.ByteString
+import pekko.stream.SystemMaterializer
 
 object FileSourceSpec {
   final case class Settings(chunkSize: Int, readAhead: Int)
@@ -43,9 +43,6 @@ object FileSourceSpec {
 @nowarn
 class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) {
 
-  val settings = 
ActorMaterializerSettings(system).withDispatcher("pekko.actor.default-dispatcher")
-  implicit val materializer: Materializer = ActorMaterializer(settings)
-
   val fs = Jimfs.newFileSystem("FileSourceSpec", Configuration.unix())
 
   val TestText = {
@@ -261,39 +258,30 @@ class FileSourceSpec extends 
StreamSpec(UnboundedMailboxConfig) {
     }
 
     "use dedicated blocking-io-dispatcher by default" in {
-      val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig)
-      val materializer = ActorMaterializer()(sys)
-      try {
-        val p = FileIO.fromPath(manyLines).runWith(TestSink())(materializer)
-
-        materializer
-          .asInstanceOf[PhasedFusingActorMaterializer]
-          .supervisor
-          .tell(StreamSupervisor.GetChildren, testActor)
-        val ref = expectMsgType[Children].children.find(_.path.toString 
contains "fileSource").get
-        try assertDispatcher(ref, ActorAttributes.IODispatcher.dispatcher)
-        finally p.cancel()
-      } finally shutdown(sys)
+      val p = FileIO.fromPath(manyLines).runWith(TestSink())
+
+      SystemMaterializer(system).materializer
+        .asInstanceOf[PhasedFusingActorMaterializer]
+        .supervisor
+        .tell(StreamSupervisor.GetChildren, testActor)
+      val ref = expectMsgType[Children].children.find(_.path.toString contains 
"fileSource").get
+      try assertDispatcher(ref, ActorAttributes.IODispatcher.dispatcher)
+      finally p.cancel()
     }
 
     "allow overriding the dispatcher using Attributes" in {
-      val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig)
-      val materializer = ActorMaterializer()(sys)
-
-      try {
-        val p = FileIO
-          .fromPath(manyLines)
-          
.addAttributes(ActorAttributes.dispatcher("pekko.actor.default-dispatcher"))
-          .runWith(TestSink())(materializer)
-
-        materializer
-          .asInstanceOf[PhasedFusingActorMaterializer]
-          .supervisor
-          .tell(StreamSupervisor.GetChildren, testActor)
-        val ref = expectMsgType[Children].children.find(_.path.toString 
contains "fileSource").get
-        try assertDispatcher(ref, "pekko.actor.default-dispatcher")
-        finally p.cancel()
-      } finally shutdown(sys)
+      val p = FileIO
+        .fromPath(manyLines)
+        
.addAttributes(ActorAttributes.dispatcher("pekko.actor.default-dispatcher"))
+        .runWith(TestSink())
+
+      SystemMaterializer(system).materializer
+        .asInstanceOf[PhasedFusingActorMaterializer]
+        .supervisor
+        .tell(StreamSupervisor.GetChildren, testActor)
+      val ref = expectMsgType[Children].children.find(_.path.toString contains 
"fileSource").get
+      try assertDispatcher(ref, "pekko.actor.default-dispatcher")
+      finally p.cancel()
     }
 
     "not signal onComplete more than once" in {
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/UnfoldResourceAsyncSourceSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/UnfoldResourceAsyncSourceSpec.scala
index ac261e039c..07a42be8d8 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/UnfoldResourceAsyncSourceSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/UnfoldResourceAsyncSourceSpec.scala
@@ -26,6 +26,7 @@ import pekko.Done
 import pekko.stream.ActorAttributes
 import pekko.stream.Materializer
 import pekko.stream.Supervision
+import pekko.stream.SystemMaterializer
 import pekko.stream.impl.PhasedFusingActorMaterializer
 import pekko.stream.impl.StreamSupervisor
 import pekko.stream.impl.StreamSupervisor.Children
@@ -325,9 +326,6 @@ class UnfoldResourceAsyncSourceSpec extends 
StreamSpec(UnboundedMailboxConfig) {
     }
 
     "use dedicated blocking-io-dispatcher by default" in {
-      // use a separate materializer to ensure we know what child is our stream
-      implicit val materializer = Materializer(system)
-
       Source
         .unfoldResourceAsync[String, Unit](
           () => Promise[Unit]().future, // never complete
@@ -335,7 +333,8 @@ class UnfoldResourceAsyncSourceSpec extends 
StreamSpec(UnboundedMailboxConfig) {
           _ => ???)
         .runWith(Sink.ignore)
 
-      
materializer.asInstanceOf[PhasedFusingActorMaterializer].supervisor.tell(StreamSupervisor.GetChildren,
 testActor)
+      
SystemMaterializer(system).materializer.asInstanceOf[PhasedFusingActorMaterializer].supervisor.tell(
+        StreamSupervisor.GetChildren, testActor)
       val ref = expectMsgType[Children].children.find(_.path.toString contains 
"unfoldResourceSourceAsync").get
       assertDispatcher(ref, ActorAttributes.IODispatcher.dispatcher)
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to