This is an automated email from the ASF dual-hosted git repository. hepin pushed a commit to branch 1.4.x-recoverWithFix in repository https://gitbox.apache.org/repos/asf/pekko.git
commit f32a1dbae296c23ca0660ffaf1b62a86570ccd70 Author: He-Pin(kerr) <[email protected]> AuthorDate: Mon Jan 19 17:34:12 2026 +0800 fix: Fix recoverWith on Failed stage. (#2631) (cherry picked from commit 029c5572a5cdb6f10b23d56b502507f6878a5441) --- .../stream/scaladsl/FlowRecoverWithSpec.scala | 36 +++++++++++++++++++--- .../apache/pekko/stream/scaladsl/SourceSpec.scala | 29 ++++++++--------- .../org/apache/pekko/stream/impl/fusing/Ops.scala | 5 +-- .../org/apache/pekko/stream/javadsl/Flow.scala | 4 +++ .../org/apache/pekko/stream/javadsl/Source.scala | 4 +++ .../org/apache/pekko/stream/javadsl/SubFlow.scala | 2 ++ .../apache/pekko/stream/javadsl/SubSource.scala | 2 ++ .../org/apache/pekko/stream/scaladsl/Flow.scala | 2 ++ 8 files changed, 62 insertions(+), 22 deletions(-) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala index 3e3779fd7a..8dfba1a3ce 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowRecoverWithSpec.scala @@ -73,11 +73,39 @@ class FlowRecoverWithSpec extends StreamSpec { } "recover with a failed future source" in { - Source.failed(ex) - .recoverWith { case _: Throwable => Source.future(Future.failed(ex)) } + val counter = new java.util.concurrent.atomic.AtomicInteger(0) + Source.failed[Int](ex) + .recoverWith { + case _: Throwable => + if (counter.incrementAndGet() < 100) { + Source.future(Future.failed(ex)) + } else { + Source.single(101) + } + } .runWith(TestSink[Int]()) - .request(1) - .expectError(ex) + .request(100) + .expectNext(101) + .expectComplete() + counter.get() shouldBe 100 + } + + "recover with a failed source" in { + val counter = new java.util.concurrent.atomic.AtomicInteger(0) + Source.failed[Int](ex) + .recoverWith { + case _: Throwable => + if (counter.incrementAndGet() < 100) { + Source.failed(ex) + } else { + Source.single(101) + } + } + .runWith(TestSink[Int]()) + .request(100) + .expectNext(101) + .expectComplete() + counter.get() shouldBe 100 } "recover with a java stream source" in { diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala index 60a11566d9..2f71af0b5a 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala @@ -611,7 +611,7 @@ class SourceSpec extends StreamSpec with DefaultTimeout { val counter = new java.util.concurrent.atomic.AtomicInteger() val source = - withRetriesTest(failedSource("origin")) { _ => + withRetriesTest(failedSource("origin")) { () => counter.incrementAndGet() exceptionSource() } { _ => @@ -625,33 +625,30 @@ class SourceSpec extends StreamSpec with DefaultTimeout { assert(counter.get() == 3) } - "not retry FailedSources" in { - // https://github.com/apache/pekko/issues/2620 + "should retry on a failed source" in { val counter = new java.util.concurrent.atomic.AtomicInteger() val source = - withRetriesTest(failedSource("origin")) { _ => - counter.incrementAndGet() - failedSource("does not work") - } { _ => - counter.get() < 3 - } + withRetriesTest(failedSource("origin")) { () => + if (counter.incrementAndGet() < 3) { + failedSource("does not work") + } else Source.single(ByteString.fromString("ok")) + } { _ => true } + .runWith(Sink.head) + val result = Await.result(source, Duration.Inf) + assert(result.utf8String == "ok") - assertThrows[ArithmeticException] { - Await.result(source.runWith(Sink.ignore), Duration.Inf) - } - - assert(counter.get() == 1) + assert(counter.get() == 3) } } - private def withRetriesTest(originSource: Source[ByteString, Any])(fallbackTo: Long => Source[ByteString, NotUsed])( + private def withRetriesTest(originSource: Source[ByteString, Any])(fallbackTo: () => Source[ByteString, NotUsed])( shouldRetry: Throwable => Boolean = { _ => true }): Source[ByteString, NotUsed] = originSource.recoverWithRetries( -1, { case e: Throwable if shouldRetry(e) => - fallbackTo(0) + fallbackTo() } ).mapMaterializedValue(_ => NotUsed) diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala index bc390db144..c7c5b852ce 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala @@ -2183,6 +2183,7 @@ private[pekko] object TakeWithin { override def onPull(): Unit = pull(in) @nowarn("msg=Any") + @tailrec def onFailure(ex: Throwable): Unit = { import Collect.NotApplied if (maximumRetries < 0 || attempt < maximumRetries) { @@ -2194,10 +2195,10 @@ private[pekko] object TakeWithin { TraversalBuilder.getValuePresentedSource(source) match { case OptionVal.Some(graph) => graph match { case singleSource: SingleSource[T @unchecked] => emit(out, singleSource.elem, () => completeStage()) - case failed: FailedSource[T @unchecked] => failStage(failed.failure) + case failed: FailedSource[T @unchecked] => onFailure(failed.failure) case futureSource: FutureSource[T @unchecked] => futureSource.future.value match { case Some(Success(elem)) => emit(out, elem, () => completeStage()) - case Some(Failure(ex)) => failStage(ex) + case Some(Failure(ex)) => onFailure(ex) case None => switchTo(source) attempt += 1 diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 24e6359300..568d437cee 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -2243,6 +2243,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * Throwing an exception inside `recoverWith` _will_ be logged on ERROR level automatically. * + * It will keep trying to recover indefinitely, if you want to limit the number of attempts, use `recoverWithRetries`. + * * '''Emits when''' element is available from the upstream or upstream is failed and element is available * from alternative Source * @@ -2265,6 +2267,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * Throwing an exception inside `recoverWith` _will_ be logged on ERROR level automatically. * + * It will keep trying to recover indefinitely, if you want to limit the number of attempts, use `recoverWithRetries`. + * * '''Emits when''' element is available from the upstream or upstream is failed and element is available * from alternative Source * diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index 5cc5401219..3643bd0e1e 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -2504,6 +2504,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * Throwing an exception inside `recoverWith` _will_ be logged on ERROR level automatically. * + * It will keep trying to recover indefinitely, if you want to limit the number of attempts, use `recoverWithRetries`. + * * '''Emits when''' element is available from the upstream or upstream is failed and element is available * from alternative Source * @@ -2526,6 +2528,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * Throwing an exception inside `recoverWith` _will_ be logged on ERROR level automatically. * + * It will keep trying to recover indefinitely, if you want to limit the number of attempts, use `recoverWithRetries`. + * * '''Emits when''' element is available from the upstream or upstream is failed and element is available * from alternative Source * diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index 9127e9323f..9f463001cb 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -1528,6 +1528,8 @@ class SubFlow[In, Out, Mat]( * * Throwing an exception inside ``recoverWith`` _will_ be logged on ERROR level automatically. * + * It will keep trying to recover indefinitely, if you want to limit the number of attempts, use `recoverWithRetries`. + * * '''Emits when''' element is available from the upstream or upstream is failed and element is available * from alternative Source * diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index 0f798dc2dd..f1b62a8901 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -1498,6 +1498,8 @@ class SubSource[Out, Mat]( * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. * This operator can recover the failure signal, but not the skipped elements, which will be dropped. * + * It will keep trying to recover indefinitely, if you want to limit the number of attempts, use `recoverWithRetries`. + * * '''Emits when''' element is available from the upstream or upstream is failed and element is available * from alternative Source * diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index bb0d7f8181..c834d249cb 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -975,6 +975,8 @@ trait FlowOps[+Out, +Mat] { * * Throwing an exception inside `recoverWith` _will_ be logged on ERROR level automatically. * + * It will keep trying to recover indefinitely, if you want to limit the number of attempts, use `recoverWithRetries`. + * * '''Emits when''' element is available from the upstream or upstream is failed and element is available * from alternative Source * --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
