This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch 1.5.x
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/1.5.x by this push:
     new 9643567961 fix: Fix recoverWith on Failed stage. #2631 (#2633)
9643567961 is described below

commit 964356796189133e48e58e6091516a94e6187a05
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Mon Jan 19 18:56:42 2026 +0800

    fix: Fix recoverWith on Failed stage. #2631 (#2633)
    
    * add test case that shows we don't retry FailedSources (#2624)
    
    * add test case that shows we don't retry FailedSources
    
    * Update SourceSpec.scala
    
    * Update SourceSpec.scala
    
    * Update SourceSpec.scala
    
    (cherry picked from commit 8031d622d1362c03c100203dc26b3621316aa204)
    
    * fix: Fix recoverWith on Failed stage. (#2631)
    
    (cherry picked from commit 029c5572a5cdb6f10b23d56b502507f6878a5441)
    
    ---------
    
    Co-authored-by: PJ Fanning <[email protected]>
---
 .../stream/scaladsl/FlowRecoverWithSpec.scala      | 36 +++++++++++++--
 .../apache/pekko/stream/scaladsl/SourceSpec.scala  | 54 ++++++++++++++++++++++
 .../org/apache/pekko/stream/impl/fusing/Ops.scala  |  5 +-
 .../org/apache/pekko/stream/javadsl/Flow.scala     |  4 ++
 .../org/apache/pekko/stream/javadsl/Source.scala   |  4 ++
 .../org/apache/pekko/stream/javadsl/SubFlow.scala  |  2 +
 .../apache/pekko/stream/javadsl/SubSource.scala    |  2 +
 .../org/apache/pekko/stream/scaladsl/Flow.scala    |  2 +
 8 files changed, 103 insertions(+), 6 deletions(-)

diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala
index 3e3779fd7a..8dfba1a3ce 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala
@@ -73,11 +73,39 @@ class FlowRecoverWithSpec extends StreamSpec {
     }
 
     "recover with a failed future source" in {
-      Source.failed(ex)
-        .recoverWith { case _: Throwable => Source.future(Future.failed(ex)) }
+      val counter = new java.util.concurrent.atomic.AtomicInteger(0)
+      Source.failed[Int](ex)
+        .recoverWith {
+          case _: Throwable =>
+            if (counter.incrementAndGet() < 100) {
+              Source.future(Future.failed(ex))
+            } else {
+              Source.single(101)
+            }
+        }
         .runWith(TestSink[Int]())
-        .request(1)
-        .expectError(ex)
+        .request(100)
+        .expectNext(101)
+        .expectComplete()
+      counter.get() shouldBe 100
+    }
+
+    "recover with a failed source" in {
+      val counter = new java.util.concurrent.atomic.AtomicInteger(0)
+      Source.failed[Int](ex)
+        .recoverWith {
+          case _: Throwable =>
+            if (counter.incrementAndGet() < 100) {
+              Source.failed(ex)
+            } else {
+              Source.single(101)
+            }
+        }
+        .runWith(TestSink[Int]())
+        .request(100)
+        .expectNext(101)
+        .expectComplete()
+      counter.get() shouldBe 100
     }
 
     "recover with a java stream source" in {
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
index c36793220a..9bc7808ee3 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
@@ -29,6 +29,7 @@ import pekko.NotUsed
 import pekko.stream.testkit._
 import pekko.stream.testkit.scaladsl.TestSink
 import pekko.testkit.EventFilter
+import pekko.util.ByteString
 
 import scala.collection.immutable
 import scala.concurrent.duration._
@@ -685,4 +686,57 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
         .expectComplete()
     }
   }
+
+  "recoverWithRetries" must {
+    "retry when exceptions occur" in {
+      val counter = new java.util.concurrent.atomic.AtomicInteger()
+
+      val source =
+        withRetriesTest(failedSource("origin")) { () =>
+          counter.incrementAndGet()
+          exceptionSource()
+        } { _ =>
+          counter.get() < 3
+        }
+
+      assertThrows[ArithmeticException] {
+        Await.result(source.runWith(Sink.ignore), Duration.Inf)
+      }
+
+      assert(counter.get() == 3)
+    }
+
+    "should retry on a failed source" in {
+      val counter = new java.util.concurrent.atomic.AtomicInteger()
+
+      val source =
+        withRetriesTest(failedSource("origin")) { () =>
+          if (counter.incrementAndGet() < 3) {
+            failedSource("does not work")
+          } else Source.single(ByteString.fromString("ok"))
+        } { _ => true }
+          .runWith(Sink.head)
+      val result = Await.result(source, Duration.Inf)
+      assert(result.utf8String == "ok")
+
+      assert(counter.get() == 3)
+    }
+  }
+
+  private def withRetriesTest(originSource: Source[ByteString, 
Any])(fallbackTo: () => Source[ByteString, NotUsed])(
+      shouldRetry: Throwable => Boolean = { _ => true }): Source[ByteString, 
NotUsed] =
+    originSource.recoverWithRetries(
+      -1,
+      {
+        case e: Throwable if shouldRetry(e) =>
+          fallbackTo()
+      }
+    ).mapMaterializedValue(_ => NotUsed)
+
+  private def failedSource(message: String): Source[ByteString, NotUsed] =
+    Source.failed(new ArithmeticException(message))
+
+  // has adivide by zero exception
+  private def exceptionSource(): Source[ByteString, NotUsed] =
+    Source.single(5).map(_ / 0).map(s => ByteString.fromString(s.toString))
 }
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
index bc390db144..c7c5b852ce 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
@@ -2183,6 +2183,7 @@ private[pekko] object TakeWithin {
       override def onPull(): Unit = pull(in)
 
       @nowarn("msg=Any")
+      @tailrec
       def onFailure(ex: Throwable): Unit = {
         import Collect.NotApplied
         if (maximumRetries < 0 || attempt < maximumRetries) {
@@ -2194,10 +2195,10 @@ private[pekko] object TakeWithin {
               TraversalBuilder.getValuePresentedSource(source) match {
                 case OptionVal.Some(graph) => graph match {
                     case singleSource: SingleSource[T @unchecked] => emit(out, 
singleSource.elem, () => completeStage())
-                    case failed: FailedSource[T @unchecked]       => 
failStage(failed.failure)
+                    case failed: FailedSource[T @unchecked]       => 
onFailure(failed.failure)
                     case futureSource: FutureSource[T @unchecked] => 
futureSource.future.value match {
                         case Some(Success(elem)) => emit(out, elem, () => 
completeStage())
-                        case Some(Failure(ex))   => failStage(ex)
+                        case Some(Failure(ex))   => onFailure(ex)
                         case None                =>
                           switchTo(source)
                           attempt += 1
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index 24e6359300..568d437cee 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -2243,6 +2243,8 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
    *
    * Throwing an exception inside `recoverWith` _will_ be logged on ERROR 
level automatically.
    *
+   * It will keep trying to recover indefinitely, if you want to limit the 
number of attempts, use `recoverWithRetries`.
+   *
    * '''Emits when''' element is available from the upstream or upstream is 
failed and element is available
    * from alternative Source
    *
@@ -2265,6 +2267,8 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
    *
    * Throwing an exception inside `recoverWith` _will_ be logged on ERROR 
level automatically.
    *
+   * It will keep trying to recover indefinitely, if you want to limit the 
number of attempts, use `recoverWithRetries`.
+   *
    * '''Emits when''' element is available from the upstream or upstream is 
failed and element is available
    * from alternative Source
    *
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index 94fdf248e2..6052572e6f 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -2505,6 +2505,8 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
    *
    * Throwing an exception inside `recoverWith` _will_ be logged on ERROR 
level automatically.
    *
+   * It will keep trying to recover indefinitely, if you want to limit the 
number of attempts, use `recoverWithRetries`.
+   *
    * '''Emits when''' element is available from the upstream or upstream is 
failed and element is available
    * from alternative Source
    *
@@ -2527,6 +2529,8 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
    *
    * Throwing an exception inside `recoverWith` _will_ be logged on ERROR 
level automatically.
    *
+   * It will keep trying to recover indefinitely, if you want to limit the 
number of attempts, use `recoverWithRetries`.
+   *
    * '''Emits when''' element is available from the upstream or upstream is 
failed and element is available
    * from alternative Source
    *
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
index 9127e9323f..9f463001cb 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
@@ -1528,6 +1528,8 @@ class SubFlow[In, Out, Mat](
    *
    * Throwing an exception inside ``recoverWith`` _will_ be logged on ERROR 
level automatically.
    *
+   * It will keep trying to recover indefinitely, if you want to limit the 
number of attempts, use `recoverWithRetries`.
+   *
    * '''Emits when''' element is available from the upstream or upstream is 
failed and element is available
    * from alternative Source
    *
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
index 0f798dc2dd..f1b62a8901 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
@@ -1498,6 +1498,8 @@ class SubSource[Out, Mat](
    * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
    * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
    *
+   * It will keep trying to recover indefinitely, if you want to limit the 
number of attempts, use `recoverWithRetries`.
+   *
    * '''Emits when''' element is available from the upstream or upstream is 
failed and element is available
    * from alternative Source
    *
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index bb0d7f8181..c834d249cb 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -975,6 +975,8 @@ trait FlowOps[+Out, +Mat] {
    *
    * Throwing an exception inside `recoverWith` _will_ be logged on ERROR 
level automatically.
    *
+   * It will keep trying to recover indefinitely, if you want to limit the 
number of attempts, use `recoverWithRetries`.
+   *
    * '''Emits when''' element is available from the upstream or upstream is 
failed and element is available
    * from alternative Source
    *


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to