Hello hAkkers, Consider this simple test (scalatest 3.0.0, scalacheck 1.13.2 and akka-streams-testkit 2.4.11 is used):
describe("Either") { implicit def noShrink[T]: Shrink[List[T]] = Shrink.shrinkAny it("should filter 'Either.Left(..)' objects") { forAll("left", "right.head", "right.tail") { (ls: List[String], rh: Int, rt: List[Int]) ⇒ val ri = rh :: rt // Will fail if last element is not Right(...) val source: List[Either[String, Int]] = ls.map(Left[String, Int]) ::: ri.map(Right[String, Int]) val (pub, sub) = TestSource.probe[Either[String, Int]] .via(Flow[Either[String, Int]].filter(_.isRight).map { case Right( i) ⇒ i }) .toMat(TestSink.probe[Int])(Keep.both) .run() sub.request(ri.size.toLong) source.foreach(pub.sendNext) sub.expectNextUnorderedN(ri) pub.sendComplete() sub.expectComplete() } } } This test will fail with `Message: assertion failed: timeout (3 seconds) during expectMsg while waiting for OnComplete` as soon as we change the order of `ls` and `ri` lists (or add some shuffle to it): val source: List[Either[String, Int]] = ri.map(Right[String, Int]) ::: ls. map(Left[String, Int]) Actually, it will fail as soon as latest element in Source is not `Right(..)`. Seems to me that leftover `Left` elements in Source do not allow TestSink.probe to receive onComplete message. Is there any way to prevent that fail or is it normal and expected behavior? -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.