This is an automated email from the ASF dual-hosted git repository. hepin pushed a commit to branch onErrorResume in repository https://gitbox.apache.org/repos/asf/pekko.git
commit 75c89a800bc0a0694a071a508d37165b0ca2e5f5 Author: He-Pin <[email protected]> AuthorDate: Sun Aug 31 16:28:55 2025 +0800 feat: Add Flow/Source#onErrorResume for javadsl. --- .../operators/Source-or-Flow/onErrorResume.md | 32 +++++ docs/src/main/paradox/stream/operators/index.md | 2 + .../org/apache/pekko/stream/javadsl/FlowTest.java | 88 +++++++++++- .../apache/pekko/stream/javadsl/SourceTest.java | 149 +++++++++++++++++++++ .../apache/pekko/stream/DslConsistencySpec.scala | 4 +- .../org/apache/pekko/stream/javadsl/Flow.scala | 75 +++++++++++ .../org/apache/pekko/stream/javadsl/Source.scala | 76 +++++++++++ 7 files changed, 421 insertions(+), 5 deletions(-) diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorResume.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorResume.md new file mode 100644 index 0000000000..321c55027a --- /dev/null +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorResume.md @@ -0,0 +1,32 @@ +# onErrorResume + +Allows transforming a failure signal into a stream of elements provided by a factory function. + +@ref[Error handling](../index.md#error-handling) + +## Signature + +@apidoc[Source.onErrorResume](Source) { java="#onErrorResume(org.apache.pekko.japi.function.Function)" }<br> +@apidoc[Source.onErrorResume](Source) { java="#onErrorResume(java.lang.Class,org.apache.pekko.japi.function.Function)" }<br> +@apidoc[Source.onErrorResume](Source) { java="#onErrorResume(org.apache.pekko.japi.function.Predicate,org.apache.pekko.japi.function.Function)" }<br> +@apidoc[Flow.onErrorResume](Flow) { java="#onErrorResume(org.apache.pekko.japi.function.Function)" }<br> +@apidoc[Flow.onErrorResume](Flow) { java="#onErrorResume(java.lang.Class,org.apache.pekko.japi.function.Function)" }<br> +@apidoc[Flow.onErrorResume](Flow) { java="#onErrorResume(org.apache.pekko.japi.function.Predicate,org.apache.pekko.japi.function.Function)" } + + +## Description + +Transform a failure signal into a stream of elements provided by a factory function. + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** element is available from the upstream or upstream is failed and fallback Source produces an element + +**backpressures** downstream backpressures + +**completes** upstream completes or upstream failed with exception and fallback Source completes + +**Cancels when** downstream cancels +@@@ \ No newline at end of file diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md index 8f46341db3..c1cd3e21e0 100644 --- a/docs/src/main/paradox/stream/operators/index.md +++ b/docs/src/main/paradox/stream/operators/index.md @@ -370,6 +370,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) |--|--|--| |Source/Flow|<a name="maperror"></a>@ref[mapError](Source-or-Flow/mapError.md)|While similar to `recover` this operators can be used to transform an error signal to a different one *without* logging it as an error in the process.| |Source/Flow|<a name="onerrorcomplete"></a>@ref[onErrorComplete](Source-or-Flow/onErrorComplete.md)|Allows completing the stream when an upstream error occurs.| +|Source/Flow|<a name="onerrorresume"></a>@ref[onErrorResume](Source-or-Flow/onErrorResume.md)|Allows transforming a failure signal into a stream of elements provided by a factory function.| |RestartSource|<a name="onfailureswithbackoff"></a>@ref[onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md)|Wrap the given @apidoc[Source] with a @apidoc[Source] that will restart it when it fails using an exponential backoff. Notice that this @apidoc[Source] will not restart on completion of the wrapped flow.| |RestartFlow|<a name="onfailureswithbackoff"></a>@ref[onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md)|Wrap the given @apidoc[Flow] with a @apidoc[Flow] that will restart it when it fails using an exponential backoff. Notice that this @apidoc[Flow] will not restart on completion of the wrapped flow.| |Source/Flow|<a name="recover"></a>@ref[recover](Source-or-Flow/recover.md)|Allow sending of one last element downstream when a failure has happened upstream.| @@ -546,6 +547,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [none](Sink/none.md) * [onComplete](Sink/onComplete.md) * [onErrorComplete](Source-or-Flow/onErrorComplete.md) +* [onErrorResume](Source-or-Flow/onErrorResume.md) * [onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md) * [onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md) * [optionalVia](Source-or-Flow/optionalVia.md) diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java index f11c7a7e31..2137589d6d 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java @@ -1338,13 +1338,32 @@ public class FlowTest extends StreamTest { return elem; } }) - .onErrorComplete() + .via(Flow.of(Integer.class).onErrorComplete()) .runWith(TestSink.probe(system), system) .request(2) .expectNext(1) .expectComplete(); } + @Test + public void mustBeAbleToOnErrorResume() { + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw new RuntimeException("ex"); + } else { + return elem; + } + }) + .via(Flow.of(Integer.class).onErrorResume(e -> Source.single(0))) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectNext(0) + .expectComplete(); + } + @Test public void mustBeAbleToOnErrorCompleteWithDedicatedException() { Source.from(Arrays.asList(1, 2)) @@ -1356,10 +1375,31 @@ public class FlowTest extends StreamTest { return elem; } }) - .onErrorComplete(IllegalArgumentException.class) + .via(Flow.of(Integer.class).onErrorComplete(IllegalArgumentException.class)) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectComplete(); + } + + @Test + public void mustBeAbleToOnErrorResumeWithDedicatedException() { + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw new IllegalArgumentException("ex"); + } else { + return elem; + } + }) + .via( + Flow.of(Integer.class) + .onErrorResume(IllegalArgumentException.class, e -> Source.single(0))) .runWith(TestSink.probe(system), system) .request(2) .expectNext(1) + .expectNext(0) .expectComplete(); } @@ -1375,7 +1415,26 @@ public class FlowTest extends StreamTest { return elem; } }) - .onErrorComplete(TimeoutException.class) + .via(Flow.of(Integer.class).onErrorComplete(TimeoutException.class)) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectError(ex); + } + + @Test + public void onErrorResumeMustBeAbleToFailWhenExceptionTypeNotMatch() { + final IllegalArgumentException ex = new IllegalArgumentException("ex"); + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw ex; + } else { + return elem; + } + }) + .via(Flow.of(Integer.class).onErrorResume(TimeoutException.class, e -> Source.single(0))) .runWith(TestSink.probe(system), system) .request(2) .expectNext(1) @@ -1393,10 +1452,31 @@ public class FlowTest extends StreamTest { return elem; } }) - .onErrorComplete(ex -> ex.getMessage().contains("Boom")) + .via(Flow.of(Integer.class).onErrorComplete(ex -> ex.getMessage().contains("Boom"))) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectComplete(); + } + + @Test + public void mustBeAbleToOnErrorResumeWithPredicate() { + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw new IllegalArgumentException("Boom"); + } else { + return elem; + } + }) + .via( + Flow.of(Integer.class) + .onErrorResume(ex -> ex.getMessage().contains("Boom"), e -> Source.single(0))) .runWith(TestSink.probe(system), system) .request(2) .expectNext(1) + .expectNext(0) .expectComplete(); } diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java index b3331212e3..3e47d66c3a 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java @@ -1647,4 +1647,153 @@ public class SourceTest extends StreamTest { .expectNext("Message1", "Message2") .expectComplete(); } + + @Test + public void mustBeAbleToOnErrorComplete() { + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw new RuntimeException("ex"); + } else { + return elem; + } + }) + .onErrorComplete() + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectComplete(); + } + + @Test + public void mustBeAbleToOnErrorResume() { + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw new RuntimeException("ex"); + } else { + return elem; + } + }) + .onErrorResume(e -> Source.single(0)) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectNext(0) + .expectComplete(); + } + + @Test + public void mustBeAbleToOnErrorCompleteWithDedicatedException() { + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw new IllegalArgumentException("ex"); + } else { + return elem; + } + }) + .onErrorComplete(IllegalArgumentException.class) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectComplete(); + } + + @Test + public void mustBeAbleToOnErrorResumeWithDedicatedException() { + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw new IllegalArgumentException("ex"); + } else { + return elem; + } + }) + .onErrorResume(IllegalArgumentException.class, e -> Source.single(0)) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectNext(0) + .expectComplete(); + } + + @Test + public void mustBeAbleToFailWhenExceptionTypeNotMatch() { + final IllegalArgumentException ex = new IllegalArgumentException("ex"); + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw ex; + } else { + return elem; + } + }) + .onErrorComplete(TimeoutException.class) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectError(ex); + } + + @Test + public void onErrorResumeMustBeAbleToFailWhenExceptionTypeNotMatch() { + final IllegalArgumentException ex = new IllegalArgumentException("ex"); + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw ex; + } else { + return elem; + } + }) + .onErrorResume(TimeoutException.class, e -> Source.single(0)) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectError(ex); + } + + @Test + public void mustBeAbleToOnErrorCompleteWithPredicate() { + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw new IllegalArgumentException("Boom"); + } else { + return elem; + } + }) + .onErrorComplete(ex -> ex.getMessage().contains("Boom")) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectComplete(); + } + + @Test + public void mustBeAbleToOnErrorResumeWithPredicate() { + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw new IllegalArgumentException("Boom"); + } else { + return elem; + } + }) + .onErrorResume(ex -> ex.getMessage().contains("Boom"), e -> Source.single(0)) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectNext(0) + .expectComplete(); + } } diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala index 7c78fe1af3..544e418fdf 100755 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala @@ -70,7 +70,9 @@ class DslConsistencySpec extends AnyWordSpec with Matchers { "andThenMat", "isIdentity", "withAttributes", - "transformMaterializing") ++ + "transformMaterializing", + "onErrorResume" // Java Only, Scala use `recoverWith` + ) ++ Set("asScala", "asJava", "deprecatedAndThen", "deprecatedAndThenMat") val graphHelpers = Set( diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 4eedf5f2ca..06ef05ae14 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -2112,6 +2112,81 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr case ex: Throwable if predicate.test(ex) => true }) + /** + * Transform a failure signal into a stream of elements provided by a factory function. + * This allows to continue processing with another stream when a failure occurs. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream or upstream is failed and fallback Source produces an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception and fallback Source completes + * + * '''Cancels when''' downstream cancels + * + * @param fallback Function which produces a Source to continue the stream + * @since 2.0.0 + */ + def onErrorResume[T >: Out](fallback: function.Function[_ >: Throwable, _ <: Graph[SourceShape[T], NotUsed]]) + : javadsl.Flow[In, T, Mat] = new Flow(delegate.recoverWith { + case ex: Throwable => fallback(ex) + }) + + /** + * Transform a failure signal into a stream of elements provided by a factory function. + * This allows to continue processing with another stream when a failure occurs. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream or upstream is failed and fallback Source produces an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception and fallback Source completes + * + * '''Cancels when''' downstream cancels + * + * @param clazz the class object of the failure cause + * @param fallback Function which produces a Source to continue the stream + * @since 2.0.0 + */ + def onErrorResume[T >: Out]( + clazz: Class[_ <: Throwable], + fallback: function.Function[_ >: Throwable, _ <: Graph[SourceShape[T], NotUsed]]) + : javadsl.Flow[In, T, Mat] = new Flow(delegate.recoverWith { + case ex: Throwable if clazz.isInstance(ex) => fallback(ex) + }) + + /** + * Transform a failure signal into a stream of elements provided by a factory function. + * This allows to continue processing with another stream when a failure occurs. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream or upstream is failed and fallback Source produces an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception and fallback Source completes + * + * '''Cancels when''' downstream cancels + * + * @param predicate Predicate which determines if the exception should be handled + * @param function Function which produces a Source to continue the stream + * @since 2.0.0 + */ + def onErrorResume[T >: Out]( + predicate: function.Predicate[_ >: Throwable], + fallback: function.Function[_ >: Throwable, _ <: Graph[SourceShape[T], NotUsed]]) + : javadsl.Flow[In, T, Mat] = new Flow(delegate.recoverWith { + case ex: Throwable if predicate.test(ex) => fallback(ex) + }) + /** * Terminate processing (and cancel the upstream publisher) after the given * number of elements. Due to input buffering some elements may have been diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index cb940dc047..c537b6f9c4 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -2349,6 +2349,82 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ case ex: Throwable if predicate.test(ex) => true }) + /** + * Transform a failure signal into a Source of elements provided by a factory function. + * This allows to continue processing with another stream when a failure occurs. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream or upstream is failed and fallback Source produces an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception and fallback Source completes + * + * '''Cancels when''' downstream cancels + * + * @param fallback Function which produces a Source to continue the stream + * @since 2.0.0 + */ + def onErrorResume[T >: Out]( + fallback: function.Function[_ >: Throwable, _ <: Graph[SourceShape[T], NotUsed]]): javadsl.Source[T, Mat] = + new Source(delegate.recoverWith { + case ex: Throwable => fallback(ex) + }) + + /** + * Transform a failure signal into a stream of elements provided by a factory function. + * This allows to continue processing with another stream when a failure occurs. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream or upstream is failed and fallback Source produces an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception and fallback Source completes + * + * '''Cancels when''' downstream cancels + * + * @param clazz the class object of the failure cause + * @param fallback Function which produces a Source to continue the stream + * @since 2.0.0 + */ + def onErrorResume[T >: Out]( + clazz: Class[_ <: Throwable], + fallback: function.Function[_ >: Throwable, _ <: Graph[SourceShape[T], NotUsed]]): javadsl.Source[T, Mat] = + new Source(delegate.recoverWith { + case ex: Throwable if clazz.isInstance(ex) => fallback(ex) + }) + + /** + * Transform a failure signal into a stream of elements provided by a factory function. + * This allows to continue processing with another stream when a failure occurs. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream or upstream is failed and fallback Source produces an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception and fallback Source completes + * + * '''Cancels when''' downstream cancels + * + * @param predicate Predicate which determines if the exception should be handled + * @param fallback Function which produces a Source to continue the stream + * @since 2.0.0 + */ + def onErrorResume[T >: Out]( + predicate: function.Predicate[_ >: Throwable], + fallback: function.Function[_ >: Throwable, _ <: Graph[SourceShape[T], NotUsed]]): javadsl.Source[T, Mat] = + new Source(delegate.recoverWith { + case ex: Throwable if predicate.test(ex) => fallback(ex) + }) + /** * Transform each input element into an `Iterable` of output elements that is * then flattened into the output stream. --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
