This is an automated email from the ASF dual-hosted git repository. hepin pushed a commit to branch 1.3.x-onErrorContinue in repository https://gitbox.apache.org/repos/asf/pekko.git
commit 73d5263a7f16ef39ad515e983b03adae8a319d83 Author: He-Pin(kerr) <[email protected]> AuthorDate: Sun Oct 19 20:24:49 2025 +0800 feat: Add Flow#onErrorContinue operator. (#2322) * feat: Add Flow#onErrorContinue operator. (cherry picked from commit 87b7ae810d8caf55ff7082385f8fa54b90c62611) --- .../operators/Source-or-Flow/onErrorContinue.md | 27 ++++++ docs/src/main/paradox/stream/operators/index.md | 2 + .../java/org/apache/pekko/stream/StreamTest.java | 5 ++ .../org/apache/pekko/stream/javadsl/FlowTest.java | 86 +++++++++++++++++++ .../apache/pekko/stream/javadsl/SourceTest.java | 75 ++++++++++++++++ .../stream/scaladsl/FlowOnErrorContinueSpec.scala | 99 ++++++++++++++++++++++ .../org/apache/pekko/stream/javadsl/Flow.scala | 76 +++++++++++++++++ .../org/apache/pekko/stream/javadsl/Source.scala | 76 +++++++++++++++++ .../org/apache/pekko/stream/javadsl/SubFlow.scala | 76 +++++++++++++++++ .../apache/pekko/stream/javadsl/SubSource.scala | 76 +++++++++++++++++ .../org/apache/pekko/stream/scaladsl/Flow.scala | 62 ++++++++++++++ 11 files changed, 660 insertions(+) diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorContinue.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorContinue.md new file mode 100644 index 0000000000..66b7e3cfd9 --- /dev/null +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorContinue.md @@ -0,0 +1,27 @@ +# onErrorContinue + +Continues the stream when an upstream error occurs. + +@ref[Error handling](../index.md#error-handling) + +## Signature + +@apidoc[Source.onErrorContinue](Source) { scala="#onErrorContinue(errorConsumer%3A%20Function%5BThrowable%2C%20Unit%5D)%3AFlowOps.this.Repr%5BT%5D" java="#onErrorContinue(org.apache.pekko.japi.function.Procedure)" } +@apidoc[Flow.onErrorContinue](Flow) { scala="#onErrorContinue%5BT%20%3C%3A%20Throwable%5D(errorConsumer%3A%20Function%5BThrowable%2C%20Unit%5D)(implicit%20tag%3A%20ClassTag%5BT%5D)%3AFlowOps.this.Repr%5BT%5D" java="#onErrorContinue(java.lang.Class,org.apache.pekko.japi.function.Procedure)" } + +## Description + +Continues the stream when an upstream error occurs. + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** element is available from the upstream + +**backpressures** downstream backpressures + +**completes** upstream completes or upstream failed with exception this operator can't handle + +**Cancels when** downstream cancels +@@@ \ No newline at end of file diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md index 662cde6f56..dc6feffe26 100644 --- a/docs/src/main/paradox/stream/operators/index.md +++ b/docs/src/main/paradox/stream/operators/index.md @@ -383,6 +383,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) |--|--|--| |Source/Flow|<a name="maperror"></a>@ref[mapError](Source-or-Flow/mapError.md)|While similar to `recover` this operators can be used to transform an error signal to a different one *without* logging it as an error in the process.| |Source/Flow|<a name="onerrorcomplete"></a>@ref[onErrorComplete](Source-or-Flow/onErrorComplete.md)|Allows completing the stream when an upstream error occurs.| +|Source/Flow|<a name="onerrorcontinue"></a>@ref[onErrorContinue](Source-or-Flow/onErrorContinue.md)|Continues the stream when an upstream error occurs.| |Source/Flow|<a name="onerrorresume"></a>@ref[onErrorResume](Source-or-Flow/onErrorResume.md)|Allows transforming a failure signal into a stream of elements provided by a factory function.| |RestartSource|<a name="onfailureswithbackoff"></a>@ref[onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md)|Wrap the given @apidoc[Source] with a @apidoc[Source] that will restart it when it fails using an exponential backoff. Notice that this @apidoc[Source] will not restart on completion of the wrapped flow.| |RestartFlow|<a name="onfailureswithbackoff"></a>@ref[onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md)|Wrap the given @apidoc[Flow] with a @apidoc[Flow] that will restart it when it fails using an exponential backoff. Notice that this @apidoc[Flow] will not restart on completion of the wrapped flow.| @@ -570,6 +571,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [none](Sink/none.md) * [onComplete](Sink/onComplete.md) * [onErrorComplete](Source-or-Flow/onErrorComplete.md) +* [onErrorContinue](Source-or-Flow/onErrorContinue.md) * [onErrorResume](Source-or-Flow/onErrorResume.md) * [onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md) * [onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md) diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/StreamTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/StreamTest.java index 6692db44e3..63dc25aab3 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/StreamTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/StreamTest.java @@ -14,6 +14,7 @@ package org.apache.pekko.stream; import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.event.LoggingAdapter; import org.apache.pekko.testkit.PekkoJUnitActorSystemResource; import org.scalatestplus.junit.JUnitSuite; @@ -23,4 +24,8 @@ public abstract class StreamTest extends JUnitSuite { protected StreamTest(PekkoJUnitActorSystemResource actorSystemResource) { system = actorSystemResource.getSystem(); } + + protected LoggingAdapter logger() { + return system.log(); + } } diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java index 868334d4f8..c2603a8c4d 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java @@ -1344,6 +1344,26 @@ public class FlowTest extends StreamTest { .expectComplete(); } + @Test + public void mustBeAbleToOnErrorContinue() { + Source.from(Arrays.asList(1, 2)) + .via( + Flow.of(Integer.class) + .map( + elem -> { + if (elem == 2) { + throw new RuntimeException("ex"); + } else { + return elem; + } + }) + .onErrorContinue(error -> logger().error(error, "Error occurred"))) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectComplete(); + } + @Test public void mustBeAbleToOnErrorResume() { Source.from(Arrays.asList(1, 2)) @@ -1381,6 +1401,28 @@ public class FlowTest extends StreamTest { .expectComplete(); } + @Test + public void mustBeAbleToOnErrorContinueWithDedicatedException() { + Source.from(Arrays.asList(1, 2)) + .via( + Flow.of(Integer.class) + .map( + elem -> { + if (elem == 2) { + throw new IllegalArgumentException("ex"); + } else { + return elem; + } + }) + .onErrorContinue( + IllegalArgumentException.class, + error -> logger().error(error, "Error occurred"))) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectComplete(); + } + @Test public void mustBeAbleToOnErrorResumeWithDedicatedException() { Source.from(Arrays.asList(1, 2)) @@ -1421,6 +1463,28 @@ public class FlowTest extends StreamTest { .expectError(ex); } + @Test + public void mustBeAbleToFailWhenOnErrorContinueExceptionTypeNotMatch() { + final IllegalArgumentException ex = new IllegalArgumentException("ex"); + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw ex; + } else { + return elem; + } + }) + .via( + Flow.of(Integer.class) + .onErrorContinue( + TimeoutException.class, error -> logger().error(error, "Error occurred"))) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectError(ex); + } + @Test public void onErrorResumeMustBeAbleToFailWhenExceptionTypeNotMatch() { final IllegalArgumentException ex = new IllegalArgumentException("ex"); @@ -1458,6 +1522,28 @@ public class FlowTest extends StreamTest { .expectComplete(); } + @Test + public void mustBeAbleToOnErrorContinueWithPredicate() { + Source.from(Arrays.asList(1, 2)) + .via( + Flow.of(Integer.class) + .map( + elem -> { + if (elem == 2) { + throw new IllegalArgumentException("Boom"); + } else { + return elem; + } + }) + .onErrorContinue( + ex -> ex.getMessage().contains("Boom"), + error -> logger().error(error, "Error occurred"))) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectComplete(); + } + @Test public void mustBeAbleToOnErrorResumeWithPredicate() { Source.from(Arrays.asList(1, 2)) diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java index 9e63236fbe..081f657820 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java @@ -1665,6 +1665,24 @@ public class SourceTest extends StreamTest { .expectComplete(); } + @Test + public void mustBeAbleToOnErrorContinue() { + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw new RuntimeException("ex"); + } else { + return elem; + } + }) + .onErrorContinue(e -> logger().error(e, "Error encountered")) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectComplete(); + } + @Test public void mustBeAbleToOnErrorResume() { Source.from(Arrays.asList(1, 2)) @@ -1702,6 +1720,25 @@ public class SourceTest extends StreamTest { .expectComplete(); } + @Test + public void mustBeAbleToOnErrorContinueWithDedicatedException() { + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw new IllegalArgumentException("ex"); + } else { + return elem; + } + }) + .onErrorContinue( + IllegalArgumentException.class, e -> logger().error(e, "Error encountered")) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectComplete(); + } + @Test public void mustBeAbleToOnErrorResumeWithDedicatedException() { Source.from(Arrays.asList(1, 2)) @@ -1740,6 +1777,25 @@ public class SourceTest extends StreamTest { .expectError(ex); } + @Test + public void mustBeAbleToFailWhenOnErrorContinueExceptionTypeNotMatch() { + final IllegalArgumentException ex = new IllegalArgumentException("ex"); + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw ex; + } else { + return elem; + } + }) + .onErrorContinue(TimeoutException.class, e -> logger().error(e, "Error encountered")) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectError(ex); + } + @Test public void onErrorResumeMustBeAbleToFailWhenExceptionTypeNotMatch() { final IllegalArgumentException ex = new IllegalArgumentException("ex"); @@ -1777,6 +1833,25 @@ public class SourceTest extends StreamTest { .expectComplete(); } + @Test + public void mustBeAbleToOnErrorContinueWithPredicate() { + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw new IllegalArgumentException("Boom"); + } else { + return elem; + } + }) + .onErrorContinue( + ex -> ex.getMessage().contains("Boom"), e -> logger().error(e, "Error encountered")) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectComplete(); + } + @Test public void mustBeAbleToOnErrorResumeWithPredicate() { Source.from(Arrays.asList(1, 2)) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowOnErrorContinueSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowOnErrorContinueSpec.scala new file mode 100644 index 0000000000..8a153a6a67 --- /dev/null +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowOnErrorContinueSpec.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.scaladsl + +import scala.concurrent.TimeoutException +import scala.util.control.NoStackTrace + +import org.apache.pekko +import pekko.stream.testkit.StreamSpec +import pekko.stream.testkit.scaladsl.TestSink + +class FlowOnErrorContinueSpec extends StreamSpec { + val ex = new RuntimeException("ex") with NoStackTrace + + "A onErrorContinue" must { + "can complete with all exceptions" in { + Source(List(1, 2)) + .map { a => + if (a == 1) throw ex else a + } + .onErrorContinue[Throwable](log.error(_, "Error occurred")) + .runWith(TestSink[Int]()) + .request(2) + .expectNext(2) + .expectComplete() + } + + "can complete with dedicated exception type" in { + Source(List(1, 2)) + .map { a => + if (a == 2) throw new IllegalArgumentException() else a + } + .onErrorContinue[IllegalArgumentException](log.error(_, "Error occurred")) + .runWith(TestSink[Int]()) + .request(2) + .expectNext(1) + .expectComplete() + } + + "can fail if an unexpected exception occur" in { + Source(List(1, 2)) + .map { a => + if (a == 2) throw new IllegalArgumentException() else a + } + .onErrorContinue[TimeoutException](log.error(_, "Error occurred")) + .runWith(TestSink[Int]()) + .request(1) + .expectNext(1) + .request(1) + .expectError() + } + + "can complete if the pf is applied" in { + Source(List(1, 2)) + .map { a => + if (a == 2) throw new TimeoutException() else a + } + .onErrorContinue { + case _: TimeoutException => true + case _ => false + }(log.error(_, "Error occurred")) + .runWith(TestSink[Int]()) + .request(2) + .expectNext(1) + .expectComplete() + } + + "can fail if the pf is not applied" in { + Source(List(1, 2)) + .map { a => + if (a == 2) throw ex else a + } + .onErrorContinue { + case _: TimeoutException => true + case _ => false + }(log.error(_, "Error occurred")) + .runWith(TestSink[Int]()) + .request(2) + .expectNext(1) + .expectError() + } + + } +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index c816535a94..f170615b57 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -2270,6 +2270,82 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr case ex: Throwable if predicate.test(ex) => true }) + /** + * Continues the stream when an upstream error occurs. + * + * When an error is signaled from upstream, the `errorConsumer` function is invoked with the + * `Throwable`, and the stream resumes processing subsequent elements. The element that caused + * the error is dropped. + * + * '''Note:''' This operator requires stream operators to support supervision. If supervision + * is not supported, this operator will have no effect. + * + * '''Emits when''' an element is available from upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @param errorConsumer function invoked when an error occurs + * @since 1.3.0 + */ + def onErrorContinue(errorConsumer: function.Procedure[_ >: Throwable]): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.onErrorContinue[Throwable](errorConsumer.apply)) + + /** + * Continues the stream when an upstream error occurs. + * + * When an error is signaled from upstream, the `errorConsumer` function is invoked with the + * `Throwable`, and the stream resumes processing subsequent elements. The element that caused + * the error is dropped. + * + * '''Note:''' This operator requires stream operators to support supervision. If supervision + * is not supported, this operator will have no effect. + * + * '''Emits when''' an element is available from upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @param clazz the class of the failure cause + * @param errorConsumer function invoked when an error occurs + * @since 1.3.0 + */ + def onErrorContinue[T <: Throwable](clazz: Class[T], + errorConsumer: function.Procedure[_ >: Throwable]): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.onErrorContinue(clazz.isInstance)(errorConsumer.apply)) + + /** + * Continues the stream when an upstream error occurs. + * + * When an error is signaled from upstream, the `errorConsumer` function is invoked with the + * `Throwable`, and the stream resumes processing subsequent elements. The element that caused + * the error is dropped. + * + * '''Note:''' This operator requires stream operators to support supervision. If supervision + * is not supported, this operator will have no effect. + * + * '''Emits when''' an element is available from upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @param p predicate which determines if the exception should be handled + * @param errorConsumer function invoked when an error occurs + * @since 1.3.0 + */ + def onErrorContinue[T <: Throwable](predicate: function.Predicate[_ >: Throwable], + errorConsumer: function.Procedure[_ >: Throwable]): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.onErrorContinue(predicate.test)(errorConsumer.apply)) + /** * Transform a failure signal into a stream of elements provided by a factory function. * This allows to continue processing with another stream when a failure occurs. diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index 00e2376d36..99113e9929 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -2540,6 +2540,82 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ case ex: Throwable if predicate.test(ex) => true }) + /** + * Continues the stream when an upstream error occurs. + * + * When an error is signaled from upstream, the `errorConsumer` function is invoked with the + * `Throwable`, and the stream resumes processing subsequent elements. The element that caused + * the error is dropped. + * + * '''Note:''' This operator requires stream operators to support supervision. If supervision + * is not supported, this operator will have no effect. + * + * '''Emits when''' an element is available from upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @param errorConsumer function invoked when an error occurs + * @since 1.3.0 + */ + def onErrorContinue(errorConsumer: function.Procedure[_ >: Throwable]): javadsl.Source[Out, Mat] = + new Source(delegate.onErrorContinue[Throwable](errorConsumer.apply)) + + /** + * Continues the stream when an upstream error occurs. + * + * When an error is signaled from upstream, the `errorConsumer` function is invoked with the + * `Throwable`, and the stream resumes processing subsequent elements. The element that caused + * the error is dropped. + * + * '''Note:''' This operator requires stream operators to support supervision. If supervision + * is not supported, this operator will have no effect. + * + * '''Emits when''' an element is available from upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @param clazz the class of the failure cause + * @param errorConsumer function invoked when an error occurs + * @since 1.3.0 + */ + def onErrorContinue[T <: Throwable](clazz: Class[T], errorConsumer: function.Procedure[_ >: Throwable]) + : javadsl.Source[Out, Mat] = + new Source(delegate.onErrorContinue(clazz.isInstance)(errorConsumer.apply)) + + /** + * Continues the stream when an upstream error occurs. + * + * When an error is signaled from upstream, the `errorConsumer` function is invoked with the + * `Throwable`, and the stream resumes processing subsequent elements. The element that caused + * the error is dropped. + * + * '''Note:''' This operator requires stream operators to support supervision. If supervision + * is not supported, this operator will have no effect. + * + * '''Emits when''' an element is available from upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @param p predicate which determines if the exception should be handled + * @param errorConsumer function invoked when an error occurs + * @since 1.3.0 + */ + def onErrorContinue[T <: Throwable](p: function.Predicate[_ >: Throwable], + errorConsumer: function.Procedure[_ >: Throwable]): javadsl.Source[Out, Mat] = + new Source(delegate.onErrorContinue(p.test)(errorConsumer.apply)) + /** * Transform a failure signal into a Source of elements provided by a factory function. * This allows to continue processing with another stream when a failure occurs. diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index edb3b907b8..7a9fc52370 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -1470,6 +1470,82 @@ class SubFlow[In, Out, Mat]( case ex: Throwable if predicate.test(ex) => true }) + /** + * Continues the stream when an upstream error occurs. + * + * When an error is signaled from upstream, the `errorConsumer` function is invoked with the + * `Throwable`, and the stream resumes processing subsequent elements. The element that caused + * the error is dropped. + * + * '''Note:''' This operator requires stream operators to support supervision. If supervision + * is not supported, this operator will have no effect. + * + * '''Emits when''' an element is available from upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @param errorConsumer function invoked when an error occurs + * @since 1.3.0 + */ + def onErrorContinue(errorConsumer: function.Procedure[_ >: Throwable]): SubFlow[In, Out, Mat] = + new SubFlow(delegate.onErrorContinue[Throwable](errorConsumer.apply)) + + /** + * Continues the stream when an upstream error occurs. + * + * When an error is signaled from upstream, the `errorConsumer` function is invoked with the + * `Throwable`, and the stream resumes processing subsequent elements. The element that caused + * the error is dropped. + * + * '''Note:''' This operator requires stream operators to support supervision. If supervision + * is not supported, this operator will have no effect. + * + * '''Emits when''' an element is available from upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @param clazz the class of the failure cause + * @param errorConsumer function invoked when an error occurs + * @since 1.3.0 + */ + def onErrorContinue[T <: Throwable](clazz: Class[T], + errorConsumer: function.Procedure[_ >: Throwable]): SubFlow[In, Out, Mat] = + new SubFlow(delegate.onErrorContinue(clazz.isInstance)(errorConsumer.apply)) + + /** + * Continues the stream when an upstream error occurs. + * + * When an error is signaled from upstream, the `errorConsumer` function is invoked with the + * `Throwable`, and the stream resumes processing subsequent elements. The element that caused + * the error is dropped. + * + * '''Note:''' This operator requires stream operators to support supervision. If supervision + * is not supported, this operator will have no effect. + * + * '''Emits when''' an element is available from upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @param p predicate which determines if the exception should be handled + * @param errorConsumer function invoked when an error occurs + * @since 1.3.0 + */ + def onErrorContinue[T <: Throwable](p: function.Predicate[_ >: Throwable], + errorConsumer: function.Procedure[_ >: Throwable]): SubFlow[In, Out, Mat] = + new SubFlow(delegate.onErrorContinue(p.test)(errorConsumer.apply)) + /** * While similar to [[recover]] this operator can be used to transform an error signal to a different one *without* logging * it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index bc77cb5bee..cd72bbf987 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -1445,6 +1445,82 @@ class SubSource[Out, Mat]( case ex: Throwable if predicate.test(ex) => true }) + /** + * Continues the stream when an upstream error occurs. + * + * When an error is signaled from upstream, the `errorConsumer` function is invoked with the + * `Throwable`, and the stream resumes processing subsequent elements. The element that caused + * the error is dropped. + * + * '''Note:''' This operator requires stream operators to support supervision. If supervision + * is not supported, this operator will have no effect. + * + * '''Emits when''' an element is available from upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @param errorConsumer function invoked when an error occurs + * @since 1.3.0 + */ + def onErrorContinue(errorConsumer: function.Procedure[_ >: Throwable]): SubSource[Out, Mat] = + new SubSource(delegate.onErrorContinue[Throwable](errorConsumer.apply)) + + /** + * Continues the stream when an upstream error occurs. + * + * When an error is signaled from upstream, the `errorConsumer` function is invoked with the + * `Throwable`, and the stream resumes processing subsequent elements. The element that caused + * the error is dropped. + * + * '''Note:''' This operator requires stream operators to support supervision. If supervision + * is not supported, this operator will have no effect. + * + * '''Emits when''' an element is available from upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @param clazz the class of the failure cause + * @param errorConsumer function invoked when an error occurs + * @since 1.3.0 + */ + def onErrorContinue[T <: Throwable](clazz: Class[T], + errorConsumer: function.Procedure[_ >: Throwable]): SubSource[Out, Mat] = + new SubSource(delegate.onErrorContinue(clazz.isInstance)(errorConsumer.apply)) + + /** + * Continues the stream when an upstream error occurs. + * + * When an error is signaled from upstream, the `errorConsumer` function is invoked with the + * `Throwable`, and the stream resumes processing subsequent elements. The element that caused + * the error is dropped. + * + * '''Note:''' This operator requires stream operators to support supervision. If supervision + * is not supported, this operator will have no effect. + * + * '''Emits when''' an element is available from upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @param p predicate which determines if the exception should be handled + * @param errorConsumer function invoked when an error occurs + * @since 1.3.0 + */ + def onErrorContinue[T <: Throwable](p: function.Predicate[_ >: Throwable], + errorConsumer: function.Procedure[_ >: Throwable]): SubSource[Out, Mat] = + new SubSource(delegate.onErrorContinue(p.test)(errorConsumer.apply)) + /** * While similar to [[recover]] this operator can be used to transform an error signal to a different one *without* logging * it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index cf4d921df9..d3d161b7ba 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -20,6 +20,7 @@ import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration import scala.reflect.ClassTag +import scala.util.control.NonFatal import org.apache.pekko import pekko.Done @@ -1058,6 +1059,67 @@ trait FlowOps[+Out, +Mat] { }: PartialFunction[Boolean, Graph[SourceShape[Out], NotUsed]])) .withAttributes(DefaultAttributes.onErrorComplete and SourceLocation.forLambda(pf))) + /** + * Continues the stream when an upstream error occurs. + * + * When an error is signaled from upstream, the `errorConsumer` function is invoked with the + * `Throwable`, and the stream resumes processing subsequent elements. The element that caused + * the error is dropped. + * + * '''Note:''' This operator requires stream operators to support supervision. If supervision + * is not supported, this operator will have no effect. + * + * '''Emits when''' an element is available from upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @param errorConsumer function invoked when an error occurs + * @since 1.3.0 + */ + def onErrorContinue[T <: Throwable](errorConsumer: Throwable => Unit)(implicit tag: ClassTag[T]): Repr[Out] = { + this.withAttributes(ActorAttributes.supervisionStrategy { + case NonFatal(e) if tag.runtimeClass.isInstance(e) => + errorConsumer(e) + Supervision.Resume + case _ => Supervision.Stop + }) + } + + /** + * Continues the stream when an upstream error occurs. + * + * When an error is signaled from upstream, the `errorConsumer` function is invoked with the + * `Throwable`, and the stream resumes processing subsequent elements. The element that caused + * the error is dropped. + * + * '''Note:''' This operator requires stream operators to support supervision. If supervision + * is not supported, this operator will have no effect. + * + * '''Emits when''' an element is available from upstream + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @param p predicate to determine which errors to handle + * @param errorConsumer function invoked when an error occurs + * @since 1.3.0 + */ + def onErrorContinue(p: Throwable => Boolean)(errorConsumer: Throwable => Unit): Repr[Out] = { + this.withAttributes(ActorAttributes.supervisionStrategy { + case NonFatal(e) if p(e) => + errorConsumer(e) + Supervision.Resume + case _ => Supervision.Stop + }) + } + /** * While similar to [[recover]] this operator can be used to transform an error signal to a different one *without* logging * it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
