This is an automated email from the ASF dual-hosted git repository. hepin pushed a commit to branch 1.5.x-recoverWithFix in repository https://gitbox.apache.org/repos/asf/pekko.git
commit a2a4793ea7ab9ba2ca4fe407860acfda7fe76733 Author: PJ Fanning <[email protected]> AuthorDate: Sun Jan 18 10:31:47 2026 +0100 add test case that shows we don't retry FailedSources (#2624) * add test case that shows we don't retry FailedSources * Update SourceSpec.scala * Update SourceSpec.scala * Update SourceSpec.scala (cherry picked from commit 8031d622d1362c03c100203dc26b3621316aa204) --- .../apache/pekko/stream/scaladsl/SourceSpec.scala | 57 ++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala index c36793220a..8533a5c11d 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala @@ -29,6 +29,7 @@ import pekko.NotUsed import pekko.stream.testkit._ import pekko.stream.testkit.scaladsl.TestSink import pekko.testkit.EventFilter +import pekko.util.ByteString import scala.collection.immutable import scala.concurrent.duration._ @@ -685,4 +686,60 @@ class SourceSpec extends StreamSpec with DefaultTimeout { .expectComplete() } } + + "recoverWithRetries" must { + "retry when exceptions occur" in { + val counter = new java.util.concurrent.atomic.AtomicInteger() + + val source = + withRetriesTest(failedSource("origin")) { _ => + counter.incrementAndGet() + exceptionSource() + } { _ => + counter.get() < 3 + } + + assertThrows[ArithmeticException] { + Await.result(source.runWith(Sink.ignore), Duration.Inf) + } + + assert(counter.get() == 3) + } + + "not retry FailedSources" in { + // https://github.com/apache/pekko/issues/2620 + val counter = new java.util.concurrent.atomic.AtomicInteger() + + val source = + withRetriesTest(failedSource("origin")) { _ => + counter.incrementAndGet() + failedSource("does not work") + } { _ => + counter.get() < 3 + } + + assertThrows[ArithmeticException] { + Await.result(source.runWith(Sink.ignore), Duration.Inf) + } + + assert(counter.get() == 1) + } + } + + private def withRetriesTest(originSource: Source[ByteString, Any])(fallbackTo: Long => Source[ByteString, NotUsed])( + shouldRetry: Throwable => Boolean = { _ => true }): Source[ByteString, NotUsed] = + originSource.recoverWithRetries( + -1, + { + case e: Throwable if shouldRetry(e) => + fallbackTo(0) + } + ).mapMaterializedValue(_ => NotUsed) + + private def failedSource(message: String): Source[ByteString, NotUsed] = + Source.failed(new ArithmeticException(message)) + + // has adivide by zero exception + private def exceptionSource(): Source[ByteString, NotUsed] = + Source.single(5).map(_ / 0).map(s => ByteString.fromString(s.toString)) } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
