This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch 1.5.x-recoverWithFix
in repository https://gitbox.apache.org/repos/asf/pekko.git

commit a2a4793ea7ab9ba2ca4fe407860acfda7fe76733
Author: PJ Fanning <[email protected]>
AuthorDate: Sun Jan 18 10:31:47 2026 +0100

    add test case that shows we don't retry FailedSources (#2624)
    
    * add test case that shows we don't retry FailedSources
    
    * Update SourceSpec.scala
    
    * Update SourceSpec.scala
    
    * Update SourceSpec.scala
    
    (cherry picked from commit 8031d622d1362c03c100203dc26b3621316aa204)
---
 .../apache/pekko/stream/scaladsl/SourceSpec.scala  | 57 ++++++++++++++++++++++
 1 file changed, 57 insertions(+)

diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
index c36793220a..8533a5c11d 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
@@ -29,6 +29,7 @@ import pekko.NotUsed
 import pekko.stream.testkit._
 import pekko.stream.testkit.scaladsl.TestSink
 import pekko.testkit.EventFilter
+import pekko.util.ByteString
 
 import scala.collection.immutable
 import scala.concurrent.duration._
@@ -685,4 +686,60 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
         .expectComplete()
     }
   }
+
+  "recoverWithRetries" must {
+    "retry when exceptions occur" in {
+      val counter = new java.util.concurrent.atomic.AtomicInteger()
+
+      val source =
+        withRetriesTest(failedSource("origin")) { _ =>
+          counter.incrementAndGet()
+          exceptionSource()
+        } { _ =>
+          counter.get() < 3
+        }
+
+      assertThrows[ArithmeticException] {
+        Await.result(source.runWith(Sink.ignore), Duration.Inf)
+      }
+
+      assert(counter.get() == 3)
+    }
+
+    "not retry FailedSources" in {
+      // https://github.com/apache/pekko/issues/2620
+      val counter = new java.util.concurrent.atomic.AtomicInteger()
+
+      val source =
+        withRetriesTest(failedSource("origin")) { _ =>
+          counter.incrementAndGet()
+          failedSource("does not work")
+        } { _ =>
+          counter.get() < 3
+        }
+
+      assertThrows[ArithmeticException] {
+        Await.result(source.runWith(Sink.ignore), Duration.Inf)
+      }
+
+      assert(counter.get() == 1)
+    }
+  }
+
+  private def withRetriesTest(originSource: Source[ByteString, 
Any])(fallbackTo: Long => Source[ByteString, NotUsed])(
+      shouldRetry: Throwable => Boolean = { _ => true }): Source[ByteString, 
NotUsed] =
+    originSource.recoverWithRetries(
+      -1,
+      {
+        case e: Throwable if shouldRetry(e) =>
+          fallbackTo(0)
+      }
+    ).mapMaterializedValue(_ => NotUsed)
+
+  private def failedSource(message: String): Source[ByteString, NotUsed] =
+    Source.failed(new ArithmeticException(message))
+
+  // has adivide by zero exception
+  private def exceptionSource(): Source[ByteString, NotUsed] =
+    Source.single(5).map(_ / 0).map(s => ByteString.fromString(s.toString))
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to