This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch 1.3.x-onErrorContinue
in repository https://gitbox.apache.org/repos/asf/pekko.git

commit 73d5263a7f16ef39ad515e983b03adae8a319d83
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun Oct 19 20:24:49 2025 +0800

    feat: Add Flow#onErrorContinue operator. (#2322)
    
    * feat: Add Flow#onErrorContinue operator.
    
    (cherry picked from commit 87b7ae810d8caf55ff7082385f8fa54b90c62611)
---
 .../operators/Source-or-Flow/onErrorContinue.md    | 27 ++++++
 docs/src/main/paradox/stream/operators/index.md    |  2 +
 .../java/org/apache/pekko/stream/StreamTest.java   |  5 ++
 .../org/apache/pekko/stream/javadsl/FlowTest.java  | 86 +++++++++++++++++++
 .../apache/pekko/stream/javadsl/SourceTest.java    | 75 ++++++++++++++++
 .../stream/scaladsl/FlowOnErrorContinueSpec.scala  | 99 ++++++++++++++++++++++
 .../org/apache/pekko/stream/javadsl/Flow.scala     | 76 +++++++++++++++++
 .../org/apache/pekko/stream/javadsl/Source.scala   | 76 +++++++++++++++++
 .../org/apache/pekko/stream/javadsl/SubFlow.scala  | 76 +++++++++++++++++
 .../apache/pekko/stream/javadsl/SubSource.scala    | 76 +++++++++++++++++
 .../org/apache/pekko/stream/scaladsl/Flow.scala    | 62 ++++++++++++++
 11 files changed, 660 insertions(+)

diff --git 
a/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorContinue.md 
b/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorContinue.md
new file mode 100644
index 0000000000..66b7e3cfd9
--- /dev/null
+++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorContinue.md
@@ -0,0 +1,27 @@
+# onErrorContinue
+
+Continues the stream when an upstream error occurs.
+
+@ref[Error handling](../index.md#error-handling)
+
+## Signature
+
+@apidoc[Source.onErrorContinue](Source) { 
scala="#onErrorContinue(errorConsumer%3A%20Function%5BThrowable%2C%20Unit%5D)%3AFlowOps.this.Repr%5BT%5D"
 java="#onErrorContinue(org.apache.pekko.japi.function.Procedure)" }
+@apidoc[Flow.onErrorContinue](Flow) { 
scala="#onErrorContinue%5BT%20%3C%3A%20Throwable%5D(errorConsumer%3A%20Function%5BThrowable%2C%20Unit%5D)(implicit%20tag%3A%20ClassTag%5BT%5D)%3AFlowOps.this.Repr%5BT%5D"
 
java="#onErrorContinue(java.lang.Class,org.apache.pekko.japi.function.Procedure)"
 }
+
+## Description
+
+Continues the stream when an upstream error occurs.
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**emits** element is available from the upstream
+
+**backpressures** downstream backpressures
+
+**completes** upstream completes or upstream failed with exception this 
operator can't handle
+
+**Cancels when** downstream cancels
+@@@
\ No newline at end of file
diff --git a/docs/src/main/paradox/stream/operators/index.md 
b/docs/src/main/paradox/stream/operators/index.md
index 662cde6f56..dc6feffe26 100644
--- a/docs/src/main/paradox/stream/operators/index.md
+++ b/docs/src/main/paradox/stream/operators/index.md
@@ -383,6 +383,7 @@ For more background see the @ref[Error Handling in 
Streams](../stream-error.md)
 |--|--|--|
 |Source/Flow|<a 
name="maperror"></a>@ref[mapError](Source-or-Flow/mapError.md)|While similar to 
`recover` this operators can be used to transform an error signal to a 
different one *without* logging it as an error in the process.|
 |Source/Flow|<a 
name="onerrorcomplete"></a>@ref[onErrorComplete](Source-or-Flow/onErrorComplete.md)|Allows
 completing the stream when an upstream error occurs.|
+|Source/Flow|<a 
name="onerrorcontinue"></a>@ref[onErrorContinue](Source-or-Flow/onErrorContinue.md)|Continues
 the stream when an upstream error occurs.|
 |Source/Flow|<a 
name="onerrorresume"></a>@ref[onErrorResume](Source-or-Flow/onErrorResume.md)|Allows
 transforming a failure signal into a stream of elements provided by a factory 
function.|
 |RestartSource|<a 
name="onfailureswithbackoff"></a>@ref[onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md)|Wrap
 the given @apidoc[Source] with a @apidoc[Source] that will restart it when it 
fails using an exponential backoff. Notice that this @apidoc[Source] will not 
restart on completion of the wrapped flow.|
 |RestartFlow|<a 
name="onfailureswithbackoff"></a>@ref[onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md)|Wrap
 the given @apidoc[Flow] with a @apidoc[Flow] that will restart it when it 
fails using an exponential backoff. Notice that this @apidoc[Flow] will not 
restart on completion of the wrapped flow.|
@@ -570,6 +571,7 @@ For more background see the @ref[Error Handling in 
Streams](../stream-error.md)
 * [none](Sink/none.md)
 * [onComplete](Sink/onComplete.md)
 * [onErrorComplete](Source-or-Flow/onErrorComplete.md)
+* [onErrorContinue](Source-or-Flow/onErrorContinue.md)
 * [onErrorResume](Source-or-Flow/onErrorResume.md)
 * [onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md)
 * [onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md)
diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/StreamTest.java 
b/stream-tests/src/test/java/org/apache/pekko/stream/StreamTest.java
index 6692db44e3..63dc25aab3 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/StreamTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/StreamTest.java
@@ -14,6 +14,7 @@
 package org.apache.pekko.stream;
 
 import org.apache.pekko.actor.ActorSystem;
+import org.apache.pekko.event.LoggingAdapter;
 import org.apache.pekko.testkit.PekkoJUnitActorSystemResource;
 import org.scalatestplus.junit.JUnitSuite;
 
@@ -23,4 +24,8 @@ public abstract class StreamTest extends JUnitSuite {
   protected StreamTest(PekkoJUnitActorSystemResource actorSystemResource) {
     system = actorSystemResource.getSystem();
   }
+
+  protected LoggingAdapter logger() {
+    return system.log();
+  }
 }
diff --git 
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java 
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
index 868334d4f8..c2603a8c4d 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
@@ -1344,6 +1344,26 @@ public class FlowTest extends StreamTest {
         .expectComplete();
   }
 
+  @Test
+  public void mustBeAbleToOnErrorContinue() {
+    Source.from(Arrays.asList(1, 2))
+        .via(
+            Flow.of(Integer.class)
+                .map(
+                    elem -> {
+                      if (elem == 2) {
+                        throw new RuntimeException("ex");
+                      } else {
+                        return elem;
+                      }
+                    })
+                .onErrorContinue(error -> logger().error(error, "Error 
occurred")))
+        .runWith(TestSink.probe(system), system)
+        .request(2)
+        .expectNext(1)
+        .expectComplete();
+  }
+
   @Test
   public void mustBeAbleToOnErrorResume() {
     Source.from(Arrays.asList(1, 2))
@@ -1381,6 +1401,28 @@ public class FlowTest extends StreamTest {
         .expectComplete();
   }
 
+  @Test
+  public void mustBeAbleToOnErrorContinueWithDedicatedException() {
+    Source.from(Arrays.asList(1, 2))
+        .via(
+            Flow.of(Integer.class)
+                .map(
+                    elem -> {
+                      if (elem == 2) {
+                        throw new IllegalArgumentException("ex");
+                      } else {
+                        return elem;
+                      }
+                    })
+                .onErrorContinue(
+                    IllegalArgumentException.class,
+                    error -> logger().error(error, "Error occurred")))
+        .runWith(TestSink.probe(system), system)
+        .request(2)
+        .expectNext(1)
+        .expectComplete();
+  }
+
   @Test
   public void mustBeAbleToOnErrorResumeWithDedicatedException() {
     Source.from(Arrays.asList(1, 2))
@@ -1421,6 +1463,28 @@ public class FlowTest extends StreamTest {
         .expectError(ex);
   }
 
+  @Test
+  public void mustBeAbleToFailWhenOnErrorContinueExceptionTypeNotMatch() {
+    final IllegalArgumentException ex = new IllegalArgumentException("ex");
+    Source.from(Arrays.asList(1, 2))
+        .map(
+            elem -> {
+              if (elem == 2) {
+                throw ex;
+              } else {
+                return elem;
+              }
+            })
+        .via(
+            Flow.of(Integer.class)
+                .onErrorContinue(
+                    TimeoutException.class, error -> logger().error(error, 
"Error occurred")))
+        .runWith(TestSink.probe(system), system)
+        .request(2)
+        .expectNext(1)
+        .expectError(ex);
+  }
+
   @Test
   public void onErrorResumeMustBeAbleToFailWhenExceptionTypeNotMatch() {
     final IllegalArgumentException ex = new IllegalArgumentException("ex");
@@ -1458,6 +1522,28 @@ public class FlowTest extends StreamTest {
         .expectComplete();
   }
 
+  @Test
+  public void mustBeAbleToOnErrorContinueWithPredicate() {
+    Source.from(Arrays.asList(1, 2))
+        .via(
+            Flow.of(Integer.class)
+                .map(
+                    elem -> {
+                      if (elem == 2) {
+                        throw new IllegalArgumentException("Boom");
+                      } else {
+                        return elem;
+                      }
+                    })
+                .onErrorContinue(
+                    ex -> ex.getMessage().contains("Boom"),
+                    error -> logger().error(error, "Error occurred")))
+        .runWith(TestSink.probe(system), system)
+        .request(2)
+        .expectNext(1)
+        .expectComplete();
+  }
+
   @Test
   public void mustBeAbleToOnErrorResumeWithPredicate() {
     Source.from(Arrays.asList(1, 2))
diff --git 
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java 
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
index 9e63236fbe..081f657820 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
@@ -1665,6 +1665,24 @@ public class SourceTest extends StreamTest {
         .expectComplete();
   }
 
+  @Test
+  public void mustBeAbleToOnErrorContinue() {
+    Source.from(Arrays.asList(1, 2))
+        .map(
+            elem -> {
+              if (elem == 2) {
+                throw new RuntimeException("ex");
+              } else {
+                return elem;
+              }
+            })
+        .onErrorContinue(e -> logger().error(e, "Error encountered"))
+        .runWith(TestSink.probe(system), system)
+        .request(2)
+        .expectNext(1)
+        .expectComplete();
+  }
+
   @Test
   public void mustBeAbleToOnErrorResume() {
     Source.from(Arrays.asList(1, 2))
@@ -1702,6 +1720,25 @@ public class SourceTest extends StreamTest {
         .expectComplete();
   }
 
+  @Test
+  public void mustBeAbleToOnErrorContinueWithDedicatedException() {
+    Source.from(Arrays.asList(1, 2))
+        .map(
+            elem -> {
+              if (elem == 2) {
+                throw new IllegalArgumentException("ex");
+              } else {
+                return elem;
+              }
+            })
+        .onErrorContinue(
+            IllegalArgumentException.class, e -> logger().error(e, "Error 
encountered"))
+        .runWith(TestSink.probe(system), system)
+        .request(2)
+        .expectNext(1)
+        .expectComplete();
+  }
+
   @Test
   public void mustBeAbleToOnErrorResumeWithDedicatedException() {
     Source.from(Arrays.asList(1, 2))
@@ -1740,6 +1777,25 @@ public class SourceTest extends StreamTest {
         .expectError(ex);
   }
 
+  @Test
+  public void mustBeAbleToFailWhenOnErrorContinueExceptionTypeNotMatch() {
+    final IllegalArgumentException ex = new IllegalArgumentException("ex");
+    Source.from(Arrays.asList(1, 2))
+        .map(
+            elem -> {
+              if (elem == 2) {
+                throw ex;
+              } else {
+                return elem;
+              }
+            })
+        .onErrorContinue(TimeoutException.class, e -> logger().error(e, "Error 
encountered"))
+        .runWith(TestSink.probe(system), system)
+        .request(2)
+        .expectNext(1)
+        .expectError(ex);
+  }
+
   @Test
   public void onErrorResumeMustBeAbleToFailWhenExceptionTypeNotMatch() {
     final IllegalArgumentException ex = new IllegalArgumentException("ex");
@@ -1777,6 +1833,25 @@ public class SourceTest extends StreamTest {
         .expectComplete();
   }
 
+  @Test
+  public void mustBeAbleToOnErrorContinueWithPredicate() {
+    Source.from(Arrays.asList(1, 2))
+        .map(
+            elem -> {
+              if (elem == 2) {
+                throw new IllegalArgumentException("Boom");
+              } else {
+                return elem;
+              }
+            })
+        .onErrorContinue(
+            ex -> ex.getMessage().contains("Boom"), e -> logger().error(e, 
"Error encountered"))
+        .runWith(TestSink.probe(system), system)
+        .request(2)
+        .expectNext(1)
+        .expectComplete();
+  }
+
   @Test
   public void mustBeAbleToOnErrorResumeWithPredicate() {
     Source.from(Arrays.asList(1, 2))
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowOnErrorContinueSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowOnErrorContinueSpec.scala
new file mode 100644
index 0000000000..8a153a6a67
--- /dev/null
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowOnErrorContinueSpec.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.scaladsl
+
+import scala.concurrent.TimeoutException
+import scala.util.control.NoStackTrace
+
+import org.apache.pekko
+import pekko.stream.testkit.StreamSpec
+import pekko.stream.testkit.scaladsl.TestSink
+
+class FlowOnErrorContinueSpec extends StreamSpec {
+  val ex = new RuntimeException("ex") with NoStackTrace
+
+  "A onErrorContinue" must {
+    "can complete with all exceptions" in {
+      Source(List(1, 2))
+        .map { a =>
+          if (a == 1) throw ex else a
+        }
+        .onErrorContinue[Throwable](log.error(_, "Error occurred"))
+        .runWith(TestSink[Int]())
+        .request(2)
+        .expectNext(2)
+        .expectComplete()
+    }
+
+    "can complete with dedicated exception type" in {
+      Source(List(1, 2))
+        .map { a =>
+          if (a == 2) throw new IllegalArgumentException() else a
+        }
+        .onErrorContinue[IllegalArgumentException](log.error(_, "Error 
occurred"))
+        .runWith(TestSink[Int]())
+        .request(2)
+        .expectNext(1)
+        .expectComplete()
+    }
+
+    "can fail if an unexpected exception occur" in {
+      Source(List(1, 2))
+        .map { a =>
+          if (a == 2) throw new IllegalArgumentException() else a
+        }
+        .onErrorContinue[TimeoutException](log.error(_, "Error occurred"))
+        .runWith(TestSink[Int]())
+        .request(1)
+        .expectNext(1)
+        .request(1)
+        .expectError()
+    }
+
+    "can complete if the pf is applied" in {
+      Source(List(1, 2))
+        .map { a =>
+          if (a == 2) throw new TimeoutException() else a
+        }
+        .onErrorContinue {
+          case _: TimeoutException => true
+          case _                   => false
+        }(log.error(_, "Error occurred"))
+        .runWith(TestSink[Int]())
+        .request(2)
+        .expectNext(1)
+        .expectComplete()
+    }
+
+    "can fail if the pf is not applied" in {
+      Source(List(1, 2))
+        .map { a =>
+          if (a == 2) throw ex else a
+        }
+        .onErrorContinue {
+          case _: TimeoutException => true
+          case _                   => false
+        }(log.error(_, "Error occurred"))
+        .runWith(TestSink[Int]())
+        .request(2)
+        .expectNext(1)
+        .expectError()
+    }
+
+  }
+}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index c816535a94..f170615b57 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -2270,6 +2270,82 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
       case ex: Throwable if predicate.test(ex) => true
     })
 
+  /**
+   * Continues the stream when an upstream error occurs.
+   *
+   * When an error is signaled from upstream, the `errorConsumer` function is 
invoked with the
+   * `Throwable`, and the stream resumes processing subsequent elements. The 
element that caused
+   * the error is dropped.
+   *
+   * '''Note:''' This operator requires stream operators to support 
supervision. If supervision
+   * is not supported, this operator will have no effect.
+   *
+   * '''Emits when''' an element is available from upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @param errorConsumer function invoked when an error occurs
+   * @since 1.3.0
+   */
+  def onErrorContinue(errorConsumer: function.Procedure[_ >: Throwable]): 
javadsl.Flow[In, Out, Mat] =
+    new Flow(delegate.onErrorContinue[Throwable](errorConsumer.apply))
+
+  /**
+   * Continues the stream when an upstream error occurs.
+   *
+   * When an error is signaled from upstream, the `errorConsumer` function is 
invoked with the
+   * `Throwable`, and the stream resumes processing subsequent elements. The 
element that caused
+   * the error is dropped.
+   *
+   * '''Note:''' This operator requires stream operators to support 
supervision. If supervision
+   * is not supported, this operator will have no effect.
+   *
+   * '''Emits when''' an element is available from upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @param clazz the class of the failure cause
+   * @param errorConsumer function invoked when an error occurs
+   * @since 1.3.0
+   */
+  def onErrorContinue[T <: Throwable](clazz: Class[T],
+      errorConsumer: function.Procedure[_ >: Throwable]): javadsl.Flow[In, 
Out, Mat] =
+    new Flow(delegate.onErrorContinue(clazz.isInstance)(errorConsumer.apply))
+
+  /**
+   * Continues the stream when an upstream error occurs.
+   *
+   * When an error is signaled from upstream, the `errorConsumer` function is 
invoked with the
+   * `Throwable`, and the stream resumes processing subsequent elements. The 
element that caused
+   * the error is dropped.
+   *
+   * '''Note:''' This operator requires stream operators to support 
supervision. If supervision
+   * is not supported, this operator will have no effect.
+   *
+   * '''Emits when''' an element is available from upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @param p predicate which determines if the exception should be handled
+   * @param errorConsumer function invoked when an error occurs
+   * @since 1.3.0
+   */
+  def onErrorContinue[T <: Throwable](predicate: function.Predicate[_ >: 
Throwable],
+      errorConsumer: function.Procedure[_ >: Throwable]): javadsl.Flow[In, 
Out, Mat] =
+    new Flow(delegate.onErrorContinue(predicate.test)(errorConsumer.apply))
+
   /**
    * Transform a failure signal into a stream of elements provided by a 
factory function.
    * This allows to continue processing with another stream when a failure 
occurs.
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index 00e2376d36..99113e9929 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -2540,6 +2540,82 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
       case ex: Throwable if predicate.test(ex) => true
     })
 
+  /**
+   * Continues the stream when an upstream error occurs.
+   *
+   * When an error is signaled from upstream, the `errorConsumer` function is 
invoked with the
+   * `Throwable`, and the stream resumes processing subsequent elements. The 
element that caused
+   * the error is dropped.
+   *
+   * '''Note:''' This operator requires stream operators to support 
supervision. If supervision
+   * is not supported, this operator will have no effect.
+   *
+   * '''Emits when''' an element is available from upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @param errorConsumer function invoked when an error occurs
+   * @since 1.3.0
+   */
+  def onErrorContinue(errorConsumer: function.Procedure[_ >: Throwable]): 
javadsl.Source[Out, Mat] =
+    new Source(delegate.onErrorContinue[Throwable](errorConsumer.apply))
+
+  /**
+   * Continues the stream when an upstream error occurs.
+   *
+   * When an error is signaled from upstream, the `errorConsumer` function is 
invoked with the
+   * `Throwable`, and the stream resumes processing subsequent elements. The 
element that caused
+   * the error is dropped.
+   *
+   * '''Note:''' This operator requires stream operators to support 
supervision. If supervision
+   * is not supported, this operator will have no effect.
+   *
+   * '''Emits when''' an element is available from upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @param clazz the class of the failure cause
+   * @param errorConsumer function invoked when an error occurs
+   * @since 1.3.0
+   */
+  def onErrorContinue[T <: Throwable](clazz: Class[T], errorConsumer: 
function.Procedure[_ >: Throwable])
+      : javadsl.Source[Out, Mat] =
+    new Source(delegate.onErrorContinue(clazz.isInstance)(errorConsumer.apply))
+
+  /**
+   * Continues the stream when an upstream error occurs.
+   *
+   * When an error is signaled from upstream, the `errorConsumer` function is 
invoked with the
+   * `Throwable`, and the stream resumes processing subsequent elements. The 
element that caused
+   * the error is dropped.
+   *
+   * '''Note:''' This operator requires stream operators to support 
supervision. If supervision
+   * is not supported, this operator will have no effect.
+   *
+   * '''Emits when''' an element is available from upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @param p predicate which determines if the exception should be handled
+   * @param errorConsumer function invoked when an error occurs
+   * @since 1.3.0
+   */
+  def onErrorContinue[T <: Throwable](p: function.Predicate[_ >: Throwable],
+      errorConsumer: function.Procedure[_ >: Throwable]): javadsl.Source[Out, 
Mat] =
+    new Source(delegate.onErrorContinue(p.test)(errorConsumer.apply))
+
   /**
    * Transform a failure signal into a Source of elements provided by a 
factory function.
    * This allows to continue processing with another stream when a failure 
occurs.
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
index edb3b907b8..7a9fc52370 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
@@ -1470,6 +1470,82 @@ class SubFlow[In, Out, Mat](
       case ex: Throwable if predicate.test(ex) => true
     })
 
+  /**
+   * Continues the stream when an upstream error occurs.
+   *
+   * When an error is signaled from upstream, the `errorConsumer` function is 
invoked with the
+   * `Throwable`, and the stream resumes processing subsequent elements. The 
element that caused
+   * the error is dropped.
+   *
+   * '''Note:''' This operator requires stream operators to support 
supervision. If supervision
+   * is not supported, this operator will have no effect.
+   *
+   * '''Emits when''' an element is available from upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @param errorConsumer function invoked when an error occurs
+   * @since 1.3.0
+   */
+  def onErrorContinue(errorConsumer: function.Procedure[_ >: Throwable]): 
SubFlow[In, Out, Mat] =
+    new SubFlow(delegate.onErrorContinue[Throwable](errorConsumer.apply))
+
+  /**
+   * Continues the stream when an upstream error occurs.
+   *
+   * When an error is signaled from upstream, the `errorConsumer` function is 
invoked with the
+   * `Throwable`, and the stream resumes processing subsequent elements. The 
element that caused
+   * the error is dropped.
+   *
+   * '''Note:''' This operator requires stream operators to support 
supervision. If supervision
+   * is not supported, this operator will have no effect.
+   *
+   * '''Emits when''' an element is available from upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @param clazz the class of the failure cause
+   * @param errorConsumer function invoked when an error occurs
+   * @since 1.3.0
+   */
+  def onErrorContinue[T <: Throwable](clazz: Class[T],
+      errorConsumer: function.Procedure[_ >: Throwable]): SubFlow[In, Out, 
Mat] =
+    new 
SubFlow(delegate.onErrorContinue(clazz.isInstance)(errorConsumer.apply))
+
+  /**
+   * Continues the stream when an upstream error occurs.
+   *
+   * When an error is signaled from upstream, the `errorConsumer` function is 
invoked with the
+   * `Throwable`, and the stream resumes processing subsequent elements. The 
element that caused
+   * the error is dropped.
+   *
+   * '''Note:''' This operator requires stream operators to support 
supervision. If supervision
+   * is not supported, this operator will have no effect.
+   *
+   * '''Emits when''' an element is available from upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @param p predicate which determines if the exception should be handled
+   * @param errorConsumer function invoked when an error occurs
+   * @since 1.3.0
+   */
+  def onErrorContinue[T <: Throwable](p: function.Predicate[_ >: Throwable],
+      errorConsumer: function.Procedure[_ >: Throwable]): SubFlow[In, Out, 
Mat] =
+    new SubFlow(delegate.onErrorContinue(p.test)(errorConsumer.apply))
+
   /**
    * While similar to [[recover]] this operator can be used to transform an 
error signal to a different one *without* logging
    * it as an error in the process. So in that sense it is NOT exactly 
equivalent to `recover(t => throw t2)` since recover
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
index bc77cb5bee..cd72bbf987 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
@@ -1445,6 +1445,82 @@ class SubSource[Out, Mat](
       case ex: Throwable if predicate.test(ex) => true
     })
 
+  /**
+   * Continues the stream when an upstream error occurs.
+   *
+   * When an error is signaled from upstream, the `errorConsumer` function is 
invoked with the
+   * `Throwable`, and the stream resumes processing subsequent elements. The 
element that caused
+   * the error is dropped.
+   *
+   * '''Note:''' This operator requires stream operators to support 
supervision. If supervision
+   * is not supported, this operator will have no effect.
+   *
+   * '''Emits when''' an element is available from upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @param errorConsumer function invoked when an error occurs
+   * @since 1.3.0
+   */
+  def onErrorContinue(errorConsumer: function.Procedure[_ >: Throwable]): 
SubSource[Out, Mat] =
+    new SubSource(delegate.onErrorContinue[Throwable](errorConsumer.apply))
+
+  /**
+   * Continues the stream when an upstream error occurs.
+   *
+   * When an error is signaled from upstream, the `errorConsumer` function is 
invoked with the
+   * `Throwable`, and the stream resumes processing subsequent elements. The 
element that caused
+   * the error is dropped.
+   *
+   * '''Note:''' This operator requires stream operators to support 
supervision. If supervision
+   * is not supported, this operator will have no effect.
+   *
+   * '''Emits when''' an element is available from upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @param clazz the class of the failure cause
+   * @param errorConsumer function invoked when an error occurs
+   * @since 1.3.0
+   */
+  def onErrorContinue[T <: Throwable](clazz: Class[T],
+      errorConsumer: function.Procedure[_ >: Throwable]): SubSource[Out, Mat] =
+    new 
SubSource(delegate.onErrorContinue(clazz.isInstance)(errorConsumer.apply))
+
+  /**
+   * Continues the stream when an upstream error occurs.
+   *
+   * When an error is signaled from upstream, the `errorConsumer` function is 
invoked with the
+   * `Throwable`, and the stream resumes processing subsequent elements. The 
element that caused
+   * the error is dropped.
+   *
+   * '''Note:''' This operator requires stream operators to support 
supervision. If supervision
+   * is not supported, this operator will have no effect.
+   *
+   * '''Emits when''' an element is available from upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @param p predicate which determines if the exception should be handled
+   * @param errorConsumer function invoked when an error occurs
+   * @since 1.3.0
+   */
+  def onErrorContinue[T <: Throwable](p: function.Predicate[_ >: Throwable],
+      errorConsumer: function.Procedure[_ >: Throwable]): SubSource[Out, Mat] =
+    new SubSource(delegate.onErrorContinue(p.test)(errorConsumer.apply))
+
   /**
    * While similar to [[recover]] this operator can be used to transform an 
error signal to a different one *without* logging
    * it as an error in the process. So in that sense it is NOT exactly 
equivalent to `recover(t => throw t2)` since recover
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index cf4d921df9..d3d161b7ba 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -20,6 +20,7 @@ import scala.collection.immutable
 import scala.concurrent.Future
 import scala.concurrent.duration.FiniteDuration
 import scala.reflect.ClassTag
+import scala.util.control.NonFatal
 
 import org.apache.pekko
 import pekko.Done
@@ -1058,6 +1059,67 @@ trait FlowOps[+Out, +Mat] {
         }: PartialFunction[Boolean, Graph[SourceShape[Out], NotUsed]]))
         .withAttributes(DefaultAttributes.onErrorComplete and 
SourceLocation.forLambda(pf)))
 
+  /**
+   * Continues the stream when an upstream error occurs.
+   *
+   * When an error is signaled from upstream, the `errorConsumer` function is 
invoked with the
+   * `Throwable`, and the stream resumes processing subsequent elements. The 
element that caused
+   * the error is dropped.
+   *
+   * '''Note:''' This operator requires stream operators to support 
supervision. If supervision
+   * is not supported, this operator will have no effect.
+   *
+   * '''Emits when''' an element is available from upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @param errorConsumer function invoked when an error occurs
+   * @since 1.3.0
+   */
+  def onErrorContinue[T <: Throwable](errorConsumer: Throwable => 
Unit)(implicit tag: ClassTag[T]): Repr[Out] = {
+    this.withAttributes(ActorAttributes.supervisionStrategy {
+      case NonFatal(e) if tag.runtimeClass.isInstance(e) =>
+        errorConsumer(e)
+        Supervision.Resume
+      case _ => Supervision.Stop
+    })
+  }
+
+  /**
+   * Continues the stream when an upstream error occurs.
+   *
+   * When an error is signaled from upstream, the `errorConsumer` function is 
invoked with the
+   * `Throwable`, and the stream resumes processing subsequent elements. The 
element that caused
+   * the error is dropped.
+   *
+   * '''Note:''' This operator requires stream operators to support 
supervision. If supervision
+   * is not supported, this operator will have no effect.
+   *
+   * '''Emits when''' an element is available from upstream
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @param p predicate to determine which errors to handle
+   * @param errorConsumer function invoked when an error occurs
+   * @since 1.3.0
+   */
+  def onErrorContinue(p: Throwable => Boolean)(errorConsumer: Throwable => 
Unit): Repr[Out] = {
+    this.withAttributes(ActorAttributes.supervisionStrategy {
+      case NonFatal(e) if p(e) =>
+        errorConsumer(e)
+        Supervision.Resume
+      case _ => Supervision.Stop
+    })
+  }
+
   /**
    * While similar to [[recover]] this operator can be used to transform an 
error signal to a different one *without* logging
    * it as an error in the process. So in that sense it is NOT exactly 
equivalent to `recover(t => throw t2)` since recover


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to