This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch onErrorResume
in repository https://gitbox.apache.org/repos/asf/pekko.git

commit 75c89a800bc0a0694a071a508d37165b0ca2e5f5
Author: He-Pin <[email protected]>
AuthorDate: Sun Aug 31 16:28:55 2025 +0800

    feat: Add Flow/Source#onErrorResume for javadsl.
---
 .../operators/Source-or-Flow/onErrorResume.md      |  32 +++++
 docs/src/main/paradox/stream/operators/index.md    |   2 +
 .../org/apache/pekko/stream/javadsl/FlowTest.java  |  88 +++++++++++-
 .../apache/pekko/stream/javadsl/SourceTest.java    | 149 +++++++++++++++++++++
 .../apache/pekko/stream/DslConsistencySpec.scala   |   4 +-
 .../org/apache/pekko/stream/javadsl/Flow.scala     |  75 +++++++++++
 .../org/apache/pekko/stream/javadsl/Source.scala   |  76 +++++++++++
 7 files changed, 421 insertions(+), 5 deletions(-)

diff --git 
a/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorResume.md 
b/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorResume.md
new file mode 100644
index 0000000000..321c55027a
--- /dev/null
+++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorResume.md
@@ -0,0 +1,32 @@
+# onErrorResume
+
+Allows transforming a failure signal into a stream of elements provided by a 
factory function.
+
+@ref[Error handling](../index.md#error-handling)
+
+## Signature
+
+@apidoc[Source.onErrorResume](Source) { 
java="#onErrorResume(org.apache.pekko.japi.function.Function)" }<br>
+@apidoc[Source.onErrorResume](Source) { 
java="#onErrorResume(java.lang.Class,org.apache.pekko.japi.function.Function)" 
}<br>
+@apidoc[Source.onErrorResume](Source) { 
java="#onErrorResume(org.apache.pekko.japi.function.Predicate,org.apache.pekko.japi.function.Function)"
 }<br>
+@apidoc[Flow.onErrorResume](Flow) { 
java="#onErrorResume(org.apache.pekko.japi.function.Function)" }<br>
+@apidoc[Flow.onErrorResume](Flow) { 
java="#onErrorResume(java.lang.Class,org.apache.pekko.japi.function.Function)" 
}<br>
+@apidoc[Flow.onErrorResume](Flow) { 
java="#onErrorResume(org.apache.pekko.japi.function.Predicate,org.apache.pekko.japi.function.Function)"
 }
+
+
+## Description
+
+Transform a failure signal into a stream of elements provided by a factory 
function.
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**emits** element is available from the upstream or upstream is failed and 
fallback Source produces an element
+
+**backpressures** downstream backpressures
+
+**completes** upstream completes or upstream failed with exception and 
fallback Source completes
+
+**Cancels when** downstream cancels
+@@@
\ No newline at end of file
diff --git a/docs/src/main/paradox/stream/operators/index.md 
b/docs/src/main/paradox/stream/operators/index.md
index 8f46341db3..c1cd3e21e0 100644
--- a/docs/src/main/paradox/stream/operators/index.md
+++ b/docs/src/main/paradox/stream/operators/index.md
@@ -370,6 +370,7 @@ For more background see the @ref[Error Handling in 
Streams](../stream-error.md)
 |--|--|--|
 |Source/Flow|<a 
name="maperror"></a>@ref[mapError](Source-or-Flow/mapError.md)|While similar to 
`recover` this operators can be used to transform an error signal to a 
different one *without* logging it as an error in the process.|
 |Source/Flow|<a 
name="onerrorcomplete"></a>@ref[onErrorComplete](Source-or-Flow/onErrorComplete.md)|Allows
 completing the stream when an upstream error occurs.|
+|Source/Flow|<a 
name="onerrorresume"></a>@ref[onErrorResume](Source-or-Flow/onErrorResume.md)|Allows
 transforming a failure signal into a stream of elements provided by a factory 
function.|
 |RestartSource|<a 
name="onfailureswithbackoff"></a>@ref[onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md)|Wrap
 the given @apidoc[Source] with a @apidoc[Source] that will restart it when it 
fails using an exponential backoff. Notice that this @apidoc[Source] will not 
restart on completion of the wrapped flow.|
 |RestartFlow|<a 
name="onfailureswithbackoff"></a>@ref[onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md)|Wrap
 the given @apidoc[Flow] with a @apidoc[Flow] that will restart it when it 
fails using an exponential backoff. Notice that this @apidoc[Flow] will not 
restart on completion of the wrapped flow.|
 |Source/Flow|<a 
name="recover"></a>@ref[recover](Source-or-Flow/recover.md)|Allow sending of 
one last element downstream when a failure has happened upstream.|
@@ -546,6 +547,7 @@ For more background see the @ref[Error Handling in 
Streams](../stream-error.md)
 * [none](Sink/none.md)
 * [onComplete](Sink/onComplete.md)
 * [onErrorComplete](Source-or-Flow/onErrorComplete.md)
+* [onErrorResume](Source-or-Flow/onErrorResume.md)
 * [onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md)
 * [onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md)
 * [optionalVia](Source-or-Flow/optionalVia.md)
diff --git 
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java 
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
index f11c7a7e31..2137589d6d 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
@@ -1338,13 +1338,32 @@ public class FlowTest extends StreamTest {
                 return elem;
               }
             })
-        .onErrorComplete()
+        .via(Flow.of(Integer.class).onErrorComplete())
         .runWith(TestSink.probe(system), system)
         .request(2)
         .expectNext(1)
         .expectComplete();
   }
 
+  @Test
+  public void mustBeAbleToOnErrorResume() {
+    Source.from(Arrays.asList(1, 2))
+        .map(
+            elem -> {
+              if (elem == 2) {
+                throw new RuntimeException("ex");
+              } else {
+                return elem;
+              }
+            })
+        .via(Flow.of(Integer.class).onErrorResume(e -> Source.single(0)))
+        .runWith(TestSink.probe(system), system)
+        .request(2)
+        .expectNext(1)
+        .expectNext(0)
+        .expectComplete();
+  }
+
   @Test
   public void mustBeAbleToOnErrorCompleteWithDedicatedException() {
     Source.from(Arrays.asList(1, 2))
@@ -1356,10 +1375,31 @@ public class FlowTest extends StreamTest {
                 return elem;
               }
             })
-        .onErrorComplete(IllegalArgumentException.class)
+        
.via(Flow.of(Integer.class).onErrorComplete(IllegalArgumentException.class))
+        .runWith(TestSink.probe(system), system)
+        .request(2)
+        .expectNext(1)
+        .expectComplete();
+  }
+
+  @Test
+  public void mustBeAbleToOnErrorResumeWithDedicatedException() {
+    Source.from(Arrays.asList(1, 2))
+        .map(
+            elem -> {
+              if (elem == 2) {
+                throw new IllegalArgumentException("ex");
+              } else {
+                return elem;
+              }
+            })
+        .via(
+            Flow.of(Integer.class)
+                .onErrorResume(IllegalArgumentException.class, e -> 
Source.single(0)))
         .runWith(TestSink.probe(system), system)
         .request(2)
         .expectNext(1)
+        .expectNext(0)
         .expectComplete();
   }
 
@@ -1375,7 +1415,26 @@ public class FlowTest extends StreamTest {
                 return elem;
               }
             })
-        .onErrorComplete(TimeoutException.class)
+        .via(Flow.of(Integer.class).onErrorComplete(TimeoutException.class))
+        .runWith(TestSink.probe(system), system)
+        .request(2)
+        .expectNext(1)
+        .expectError(ex);
+  }
+
+  @Test
+  public void onErrorResumeMustBeAbleToFailWhenExceptionTypeNotMatch() {
+    final IllegalArgumentException ex = new IllegalArgumentException("ex");
+    Source.from(Arrays.asList(1, 2))
+        .map(
+            elem -> {
+              if (elem == 2) {
+                throw ex;
+              } else {
+                return elem;
+              }
+            })
+        .via(Flow.of(Integer.class).onErrorResume(TimeoutException.class, e -> 
Source.single(0)))
         .runWith(TestSink.probe(system), system)
         .request(2)
         .expectNext(1)
@@ -1393,10 +1452,31 @@ public class FlowTest extends StreamTest {
                 return elem;
               }
             })
-        .onErrorComplete(ex -> ex.getMessage().contains("Boom"))
+        .via(Flow.of(Integer.class).onErrorComplete(ex -> 
ex.getMessage().contains("Boom")))
+        .runWith(TestSink.probe(system), system)
+        .request(2)
+        .expectNext(1)
+        .expectComplete();
+  }
+
+  @Test
+  public void mustBeAbleToOnErrorResumeWithPredicate() {
+    Source.from(Arrays.asList(1, 2))
+        .map(
+            elem -> {
+              if (elem == 2) {
+                throw new IllegalArgumentException("Boom");
+              } else {
+                return elem;
+              }
+            })
+        .via(
+            Flow.of(Integer.class)
+                .onErrorResume(ex -> ex.getMessage().contains("Boom"), e -> 
Source.single(0)))
         .runWith(TestSink.probe(system), system)
         .request(2)
         .expectNext(1)
+        .expectNext(0)
         .expectComplete();
   }
 
diff --git 
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java 
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
index b3331212e3..3e47d66c3a 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
@@ -1647,4 +1647,153 @@ public class SourceTest extends StreamTest {
         .expectNext("Message1", "Message2")
         .expectComplete();
   }
+
+  @Test
+  public void mustBeAbleToOnErrorComplete() {
+    Source.from(Arrays.asList(1, 2))
+        .map(
+            elem -> {
+              if (elem == 2) {
+                throw new RuntimeException("ex");
+              } else {
+                return elem;
+              }
+            })
+        .onErrorComplete()
+        .runWith(TestSink.probe(system), system)
+        .request(2)
+        .expectNext(1)
+        .expectComplete();
+  }
+
+  @Test
+  public void mustBeAbleToOnErrorResume() {
+    Source.from(Arrays.asList(1, 2))
+        .map(
+            elem -> {
+              if (elem == 2) {
+                throw new RuntimeException("ex");
+              } else {
+                return elem;
+              }
+            })
+        .onErrorResume(e -> Source.single(0))
+        .runWith(TestSink.probe(system), system)
+        .request(2)
+        .expectNext(1)
+        .expectNext(0)
+        .expectComplete();
+  }
+
+  @Test
+  public void mustBeAbleToOnErrorCompleteWithDedicatedException() {
+    Source.from(Arrays.asList(1, 2))
+        .map(
+            elem -> {
+              if (elem == 2) {
+                throw new IllegalArgumentException("ex");
+              } else {
+                return elem;
+              }
+            })
+        .onErrorComplete(IllegalArgumentException.class)
+        .runWith(TestSink.probe(system), system)
+        .request(2)
+        .expectNext(1)
+        .expectComplete();
+  }
+
+  @Test
+  public void mustBeAbleToOnErrorResumeWithDedicatedException() {
+    Source.from(Arrays.asList(1, 2))
+        .map(
+            elem -> {
+              if (elem == 2) {
+                throw new IllegalArgumentException("ex");
+              } else {
+                return elem;
+              }
+            })
+        .onErrorResume(IllegalArgumentException.class, e -> Source.single(0))
+        .runWith(TestSink.probe(system), system)
+        .request(2)
+        .expectNext(1)
+        .expectNext(0)
+        .expectComplete();
+  }
+
+  @Test
+  public void mustBeAbleToFailWhenExceptionTypeNotMatch() {
+    final IllegalArgumentException ex = new IllegalArgumentException("ex");
+    Source.from(Arrays.asList(1, 2))
+        .map(
+            elem -> {
+              if (elem == 2) {
+                throw ex;
+              } else {
+                return elem;
+              }
+            })
+        .onErrorComplete(TimeoutException.class)
+        .runWith(TestSink.probe(system), system)
+        .request(2)
+        .expectNext(1)
+        .expectError(ex);
+  }
+
+  @Test
+  public void onErrorResumeMustBeAbleToFailWhenExceptionTypeNotMatch() {
+    final IllegalArgumentException ex = new IllegalArgumentException("ex");
+    Source.from(Arrays.asList(1, 2))
+        .map(
+            elem -> {
+              if (elem == 2) {
+                throw ex;
+              } else {
+                return elem;
+              }
+            })
+        .onErrorResume(TimeoutException.class, e -> Source.single(0))
+        .runWith(TestSink.probe(system), system)
+        .request(2)
+        .expectNext(1)
+        .expectError(ex);
+  }
+
+  @Test
+  public void mustBeAbleToOnErrorCompleteWithPredicate() {
+    Source.from(Arrays.asList(1, 2))
+        .map(
+            elem -> {
+              if (elem == 2) {
+                throw new IllegalArgumentException("Boom");
+              } else {
+                return elem;
+              }
+            })
+        .onErrorComplete(ex -> ex.getMessage().contains("Boom"))
+        .runWith(TestSink.probe(system), system)
+        .request(2)
+        .expectNext(1)
+        .expectComplete();
+  }
+
+  @Test
+  public void mustBeAbleToOnErrorResumeWithPredicate() {
+    Source.from(Arrays.asList(1, 2))
+        .map(
+            elem -> {
+              if (elem == 2) {
+                throw new IllegalArgumentException("Boom");
+              } else {
+                return elem;
+              }
+            })
+        .onErrorResume(ex -> ex.getMessage().contains("Boom"), e -> 
Source.single(0))
+        .runWith(TestSink.probe(system), system)
+        .request(2)
+        .expectNext(1)
+        .expectNext(0)
+        .expectComplete();
+  }
 }
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala 
b/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala
index 7c78fe1af3..544e418fdf 100755
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala
@@ -70,7 +70,9 @@ class DslConsistencySpec extends AnyWordSpec with Matchers {
       "andThenMat",
       "isIdentity",
       "withAttributes",
-      "transformMaterializing") ++
+      "transformMaterializing",
+      "onErrorResume" // Java Only, Scala use `recoverWith`
+    ) ++
     Set("asScala", "asJava", "deprecatedAndThen", "deprecatedAndThenMat")
 
   val graphHelpers = Set(
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index 4eedf5f2ca..06ef05ae14 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -2112,6 +2112,81 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
       case ex: Throwable if predicate.test(ex) => true
     })
 
+  /**
+   * Transform a failure signal into a stream of elements provided by a 
factory function.
+   * This allows to continue processing with another stream when a failure 
occurs.
+   *
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * '''Emits when''' element is available from the upstream or upstream is 
failed and fallback Source produces an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or upstream failed with exception 
and fallback Source completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @param fallback Function which produces a Source to continue the stream
+   * @since 2.0.0
+   */
+  def onErrorResume[T >: Out](fallback: function.Function[_ >: Throwable, _ <: 
Graph[SourceShape[T], NotUsed]])
+      : javadsl.Flow[In, T, Mat] = new Flow(delegate.recoverWith {
+    case ex: Throwable => fallback(ex)
+  })
+
+  /**
+   * Transform a failure signal into a stream of elements provided by a 
factory function.
+   * This allows to continue processing with another stream when a failure 
occurs.
+   *
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * '''Emits when''' element is available from the upstream or upstream is 
failed and fallback Source produces an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or upstream failed with exception 
and fallback Source completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @param clazz the class object of the failure cause
+   * @param fallback Function which produces a Source to continue the stream
+   * @since 2.0.0
+   */
+  def onErrorResume[T >: Out](
+      clazz: Class[_ <: Throwable],
+      fallback: function.Function[_ >: Throwable, _ <: Graph[SourceShape[T], 
NotUsed]])
+      : javadsl.Flow[In, T, Mat] = new Flow(delegate.recoverWith {
+    case ex: Throwable if clazz.isInstance(ex) => fallback(ex)
+  })
+
+  /**
+   * Transform a failure signal into a stream of elements provided by a 
factory function.
+   * This allows to continue processing with another stream when a failure 
occurs.
+   *
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * '''Emits when''' element is available from the upstream or upstream is 
failed and fallback Source produces an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or upstream failed with exception 
and fallback Source completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @param predicate Predicate which determines if the exception should be 
handled
+   * @param function Function which produces a Source to continue the stream
+   * @since 2.0.0
+   */
+  def onErrorResume[T >: Out](
+      predicate: function.Predicate[_ >: Throwable],
+      fallback: function.Function[_ >: Throwable, _ <: Graph[SourceShape[T], 
NotUsed]])
+      : javadsl.Flow[In, T, Mat] = new Flow(delegate.recoverWith {
+    case ex: Throwable if predicate.test(ex) => fallback(ex)
+  })
+
   /**
    * Terminate processing (and cancel the upstream publisher) after the given
    * number of elements. Due to input buffering some elements may have been
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index cb940dc047..c537b6f9c4 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -2349,6 +2349,82 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
       case ex: Throwable if predicate.test(ex) => true
     })
 
+  /**
+   * Transform a failure signal into a Source of elements provided by a 
factory function.
+   * This allows to continue processing with another stream when a failure 
occurs.
+   *
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * '''Emits when''' element is available from the upstream or upstream is 
failed and fallback Source produces an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or upstream failed with exception 
and fallback Source completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @param fallback Function which produces a Source to continue the stream
+   * @since 2.0.0
+   */
+  def onErrorResume[T >: Out](
+      fallback: function.Function[_ >: Throwable, _ <: Graph[SourceShape[T], 
NotUsed]]): javadsl.Source[T, Mat] =
+    new Source(delegate.recoverWith {
+      case ex: Throwable => fallback(ex)
+    })
+
+  /**
+   * Transform a failure signal into a stream of elements provided by a 
factory function.
+   * This allows to continue processing with another stream when a failure 
occurs.
+   *
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * '''Emits when''' element is available from the upstream or upstream is 
failed and fallback Source produces an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or upstream failed with exception 
and fallback Source completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @param clazz the class object of the failure cause
+   * @param fallback Function which produces a Source to continue the stream
+   * @since 2.0.0
+   */
+  def onErrorResume[T >: Out](
+      clazz: Class[_ <: Throwable],
+      fallback: function.Function[_ >: Throwable, _ <: Graph[SourceShape[T], 
NotUsed]]): javadsl.Source[T, Mat] =
+    new Source(delegate.recoverWith {
+      case ex: Throwable if clazz.isInstance(ex) => fallback(ex)
+    })
+
+  /**
+   * Transform a failure signal into a stream of elements provided by a 
factory function.
+   * This allows to continue processing with another stream when a failure 
occurs.
+   *
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * '''Emits when''' element is available from the upstream or upstream is 
failed and fallback Source produces an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or upstream failed with exception 
and fallback Source completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @param predicate Predicate which determines if the exception should be 
handled
+   * @param fallback Function which produces a Source to continue the stream
+   * @since 2.0.0
+   */
+  def onErrorResume[T >: Out](
+      predicate: function.Predicate[_ >: Throwable],
+      fallback: function.Function[_ >: Throwable, _ <: Graph[SourceShape[T], 
NotUsed]]): javadsl.Source[T, Mat] =
+    new Source(delegate.recoverWith {
+      case ex: Throwable if predicate.test(ex) => fallback(ex)
+    })
+
   /**
    * Transform each input element into an `Iterable` of output elements that is
    * then flattened into the output stream.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to