This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch recover
in repository https://gitbox.apache.org/repos/asf/pekko.git

commit 88bdcd89ea75dd331c6c8447eb9d6a95fb2352ee
Author: He-Pin <[email protected]>
AuthorDate: Sun Oct 19 23:03:00 2025 +0800

    feat: Add more recover operators  for java dsl.
---
 .../org/apache/pekko/stream/javadsl/Flow.scala     | 66 ++++++++++++++++
 .../org/apache/pekko/stream/javadsl/Source.scala   | 66 ++++++++++++++++
 .../org/apache/pekko/stream/javadsl/SubFlow.scala  | 88 ++++++++++++++++++++++
 .../apache/pekko/stream/javadsl/SubSource.scala    | 80 ++++++++++++++++++++
 4 files changed, 300 insertions(+)

diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index b8be99a2b9..24f295d0aa 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -1894,6 +1894,72 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
       case elem if clazz.isInstance(elem) => creator.create()
     }
 
+  /**
+   * Recover allows to send last element on failure and gracefully complete 
the stream
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * Throwing an exception inside `recover` _will_ be logged on ERROR level 
automatically.
+   *
+   * '''Emits when''' element is available from the upstream or upstream is 
failed and pf returns an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or upstream failed with exception 
pf can handle
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.3.0
+   */
+  def recover(clazz: Class[_ <: Throwable], fallbackValue: Out): 
javadsl.Flow[In, Out, Mat] =
+    recover {
+      case elem if clazz.isInstance(elem) => fallbackValue
+    }
+
+  /**
+   * Recover allows to send last element on failure and gracefully complete 
the stream
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * Throwing an exception inside `recover` _will_ be logged on ERROR level 
automatically.
+   *
+   * '''Emits when''' element is available from the upstream or upstream is 
failed and pf returns an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or upstream failed with exception 
pf can handle
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.3.0
+   */
+  def recover(p: function.Predicate[_ >: Throwable], creator: 
function.Creator[Out]): javadsl.Flow[In, Out, Mat] =
+    recover {
+      case elem if p.test(elem) => creator.create()
+    }
+
+  /**
+   * Recover allows to send last element on failure and gracefully complete 
the stream
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * Throwing an exception inside `recover` _will_ be logged on ERROR level 
automatically.
+   *
+   * '''Emits when''' element is available from the upstream or upstream is 
failed and pf returns an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or upstream failed with exception 
pf can handle
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.3.0
+   */
+  def recover(p: function.Predicate[_ >: Throwable], fallbackValue: Out): 
javadsl.Flow[In, Out, Mat] =
+    recover {
+      case elem if p.test(elem) => fallbackValue
+    }
+
   /**
    * While similar to [[recover]] this operator can be used to transform an 
error signal to a different one *without* logging
    * it as an error in the process. So in that sense it is NOT exactly 
equivalent to `recover(t => throw t2)` since recover
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index 127146a1a9..4291ffe193 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -2131,6 +2131,72 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
       case elem if clazz.isInstance(elem) => creator.create()
     }
 
+  /**
+   * Recover allows to send last element on failure and gracefully complete 
the stream
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * Throwing an exception inside `recover` _will_ be logged on ERROR level 
automatically.
+   *
+   * '''Emits when''' element is available from the upstream or upstream is 
failed and pf returns an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or upstream failed with exception 
pf can handle
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.3.0
+   */
+  def recover(clazz: Class[_ <: Throwable], fallbackValue: Out): 
javadsl.Source[Out, Mat] =
+    recover {
+      case elem if clazz.isInstance(elem) => fallbackValue
+    }
+
+  /**
+   * Recover allows to send last element on failure and gracefully complete 
the stream
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * Throwing an exception inside `recover` _will_ be logged on ERROR level 
automatically.
+   *
+   * '''Emits when''' element is available from the upstream or upstream is 
failed and pf returns an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or upstream failed with exception 
pf can handle
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.3.0
+   */
+  def recover(p: function.Predicate[_ >: Throwable], creator: 
function.Creator[Out]): javadsl.Source[Out, Mat] =
+    recover {
+      case elem if p.test(elem) => creator.create()
+    }
+
+  /**
+   * Recover allows to send last element on failure and gracefully complete 
the stream
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * Throwing an exception inside `recover` _will_ be logged on ERROR level 
automatically.
+   *
+   * '''Emits when''' element is available from the upstream or upstream is 
failed and pf returns an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or upstream failed with exception 
pf can handle
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.3.0
+   */
+  def recover(p: function.Predicate[_ >: Throwable], fallbackValue: Out): 
javadsl.Source[Out, Mat] =
+    recover {
+      case elem if p.test(elem) => fallbackValue
+    }
+
   /**
    * While similar to [[recover]] this operator can be used to transform an 
error signal to a different one *without* logging
    * it as an error in the process. So in that sense it is NOT exactly 
equivalent to `recover(t => throw t2)` since recover
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
index 8da6eb21c0..35e6537e5c 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
@@ -1265,6 +1265,94 @@ final class SubFlow[In, Out, Mat](
   def recover(pf: PartialFunction[Throwable, Out]): SubFlow[In, Out, Mat] =
     new SubFlow(delegate.recover(pf))
 
+  /**
+   * Recover allows to send last element on failure and gracefully complete 
the stream
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * Throwing an exception inside `recover` _will_ be logged on ERROR level 
automatically.
+   *
+   * '''Emits when''' element is available from the upstream or upstream is 
failed and pf returns an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or upstream failed with exception 
pf can handle
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.3.0
+   */
+  def recover(clazz: Class[_ <: Throwable], creator: function.Creator[Out]): 
SubFlow[In, Out, Mat] =
+    new SubFlow(delegate.recover {
+      case elem if clazz.isInstance(elem) => creator.create()
+    })
+
+  /**
+   * Recover allows to send last element on failure and gracefully complete 
the stream
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * Throwing an exception inside `recover` _will_ be logged on ERROR level 
automatically.
+   *
+   * '''Emits when''' element is available from the upstream or upstream is 
failed and pf returns an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or upstream failed with exception 
pf can handle
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.3.0
+   */
+  def recover(clazz: Class[_ <: Throwable], fallbackValue: Out): SubFlow[In, 
Out, Mat] =
+    new SubFlow(delegate.recover {
+      case elem if clazz.isInstance(elem) => fallbackValue
+    })
+
+  /**
+   * Recover allows to send last element on failure and gracefully complete 
the stream
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * Throwing an exception inside `recover` _will_ be logged on ERROR level 
automatically.
+   *
+   * '''Emits when''' element is available from the upstream or upstream is 
failed and pf returns an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or upstream failed with exception 
pf can handle
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.3.0
+   */
+  def recover(p: function.Predicate[_ >: Throwable], creator: 
function.Creator[Out]): SubFlow[In, Out, Mat] =
+    new SubFlow(delegate.recover {
+      case elem if p.test(elem) => creator.create()
+    })
+
+  /**
+   * Recover allows to send last element on failure and gracefully complete 
the stream
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * Throwing an exception inside `recover` _will_ be logged on ERROR level 
automatically.
+   *
+   * '''Emits when''' element is available from the upstream or upstream is 
failed and pf returns an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or upstream failed with exception 
pf can handle
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.3.0
+   */
+  def recover(p: function.Predicate[_ >: Throwable], fallbackValue: Out): 
SubFlow[In, Out, Mat] =
+    new SubFlow(delegate.recover {
+      case elem if p.test(elem) => fallbackValue
+    })
+
   /**
    * RecoverWith allows to switch to alternative Source on flow failure. It 
will stay in effect after
    * a failure has been recovered so that each time there is a failure it is 
fed into the `pf` and a new
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
index 5521a2f157..d26503ed65 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
@@ -1247,6 +1247,86 @@ final class SubSource[Out, Mat](
   def recover(pf: PartialFunction[Throwable, Out]): SubSource[Out, Mat] =
     new SubSource(delegate.recover(pf))
 
+  /**
+   * Recover allows to send last element on failure and gracefully complete 
the stream
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * '''Emits when''' element is available from the upstream or upstream is 
failed and pf returns an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or upstream failed with exception 
pf can handle
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.3.0
+   */
+  def recover(clazz: Class[_ <: Throwable], creator: function.Creator[Out]): 
SubSource[Out, Mat] =
+    new SubSource(delegate.recover {
+      case elem if clazz.isInstance(elem) => creator.create()
+    })
+
+  /**
+   * Recover allows to send last element on failure and gracefully complete 
the stream
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * '''Emits when''' element is available from the upstream or upstream is 
failed and pf returns an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or upstream failed with exception 
pf can handle
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.3.0
+   */
+  def recover(clazz: Class[_ <: Throwable], fallbackValue: Out): 
SubSource[Out, Mat] =
+    new SubSource(delegate.recover {
+      case elem if clazz.isInstance(elem) => fallbackValue
+    })
+
+  /**
+   * Recover allows to send last element on failure and gracefully complete 
the stream
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * '''Emits when''' element is available from the upstream or upstream is 
failed and pf returns an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or upstream failed with exception 
pf can handle
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.3.0
+   */
+  def recover(p: function.Predicate[_ >: Throwable], creator: 
function.Creator[Out]): SubSource[Out, Mat] =
+    new SubSource(delegate.recover {
+      case elem if p.test(elem) => creator.create()
+    })
+
+  /**
+   * Recover allows to send last element on failure and gracefully complete 
the stream
+   * Since the underlying failure signal onError arrives out-of-band, it might 
jump over existing elements.
+   * This operator can recover the failure signal, but not the skipped 
elements, which will be dropped.
+   *
+   * '''Emits when''' element is available from the upstream or upstream is 
failed and pf returns an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes or upstream failed with exception 
pf can handle
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.3.0
+   */
+  def recover(p: function.Predicate[_ >: Throwable], fallbackValue: Out): 
SubSource[Out, Mat] =
+    new SubSource(delegate.recover {
+      case elem if p.test(elem) => fallbackValue
+    })
+
   /**
    * RecoverWith allows to switch to alternative Source on flow failure. It 
will stay in effect after
    * a failure has been recovered so that each time there is a failure it is 
fed into the `pf` and a new


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to