This is an automated email from the ASF dual-hosted git repository. hepin pushed a commit to branch recover in repository https://gitbox.apache.org/repos/asf/pekko.git
commit 88bdcd89ea75dd331c6c8447eb9d6a95fb2352ee Author: He-Pin <[email protected]> AuthorDate: Sun Oct 19 23:03:00 2025 +0800 feat: Add more recover operators for java dsl. --- .../org/apache/pekko/stream/javadsl/Flow.scala | 66 ++++++++++++++++ .../org/apache/pekko/stream/javadsl/Source.scala | 66 ++++++++++++++++ .../org/apache/pekko/stream/javadsl/SubFlow.scala | 88 ++++++++++++++++++++++ .../apache/pekko/stream/javadsl/SubSource.scala | 80 ++++++++++++++++++++ 4 files changed, 300 insertions(+) diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index b8be99a2b9..24f295d0aa 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -1894,6 +1894,72 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr case elem if clazz.isInstance(elem) => creator.create() } + /** + * Recover allows to send last element on failure and gracefully complete the stream + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * Throwing an exception inside `recover` _will_ be logged on ERROR level automatically. + * + * '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + * @since 1.3.0 + */ + def recover(clazz: Class[_ <: Throwable], fallbackValue: Out): javadsl.Flow[In, Out, Mat] = + recover { + case elem if clazz.isInstance(elem) => fallbackValue + } + + /** + * Recover allows to send last element on failure and gracefully complete the stream + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * Throwing an exception inside `recover` _will_ be logged on ERROR level automatically. + * + * '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + * @since 1.3.0 + */ + def recover(p: function.Predicate[_ >: Throwable], creator: function.Creator[Out]): javadsl.Flow[In, Out, Mat] = + recover { + case elem if p.test(elem) => creator.create() + } + + /** + * Recover allows to send last element on failure and gracefully complete the stream + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * Throwing an exception inside `recover` _will_ be logged on ERROR level automatically. + * + * '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + * @since 1.3.0 + */ + def recover(p: function.Predicate[_ >: Throwable], fallbackValue: Out): javadsl.Flow[In, Out, Mat] = + recover { + case elem if p.test(elem) => fallbackValue + } + /** * While similar to [[recover]] this operator can be used to transform an error signal to a different one *without* logging * it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index 127146a1a9..4291ffe193 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -2131,6 +2131,72 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ case elem if clazz.isInstance(elem) => creator.create() } + /** + * Recover allows to send last element on failure and gracefully complete the stream + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * Throwing an exception inside `recover` _will_ be logged on ERROR level automatically. + * + * '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + * @since 1.3.0 + */ + def recover(clazz: Class[_ <: Throwable], fallbackValue: Out): javadsl.Source[Out, Mat] = + recover { + case elem if clazz.isInstance(elem) => fallbackValue + } + + /** + * Recover allows to send last element on failure and gracefully complete the stream + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * Throwing an exception inside `recover` _will_ be logged on ERROR level automatically. + * + * '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + * @since 1.3.0 + */ + def recover(p: function.Predicate[_ >: Throwable], creator: function.Creator[Out]): javadsl.Source[Out, Mat] = + recover { + case elem if p.test(elem) => creator.create() + } + + /** + * Recover allows to send last element on failure and gracefully complete the stream + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * Throwing an exception inside `recover` _will_ be logged on ERROR level automatically. + * + * '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + * @since 1.3.0 + */ + def recover(p: function.Predicate[_ >: Throwable], fallbackValue: Out): javadsl.Source[Out, Mat] = + recover { + case elem if p.test(elem) => fallbackValue + } + /** * While similar to [[recover]] this operator can be used to transform an error signal to a different one *without* logging * it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index 8da6eb21c0..35e6537e5c 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -1265,6 +1265,94 @@ final class SubFlow[In, Out, Mat]( def recover(pf: PartialFunction[Throwable, Out]): SubFlow[In, Out, Mat] = new SubFlow(delegate.recover(pf)) + /** + * Recover allows to send last element on failure and gracefully complete the stream + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * Throwing an exception inside `recover` _will_ be logged on ERROR level automatically. + * + * '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + * @since 1.3.0 + */ + def recover(clazz: Class[_ <: Throwable], creator: function.Creator[Out]): SubFlow[In, Out, Mat] = + new SubFlow(delegate.recover { + case elem if clazz.isInstance(elem) => creator.create() + }) + + /** + * Recover allows to send last element on failure and gracefully complete the stream + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * Throwing an exception inside `recover` _will_ be logged on ERROR level automatically. + * + * '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + * @since 1.3.0 + */ + def recover(clazz: Class[_ <: Throwable], fallbackValue: Out): SubFlow[In, Out, Mat] = + new SubFlow(delegate.recover { + case elem if clazz.isInstance(elem) => fallbackValue + }) + + /** + * Recover allows to send last element on failure and gracefully complete the stream + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * Throwing an exception inside `recover` _will_ be logged on ERROR level automatically. + * + * '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + * @since 1.3.0 + */ + def recover(p: function.Predicate[_ >: Throwable], creator: function.Creator[Out]): SubFlow[In, Out, Mat] = + new SubFlow(delegate.recover { + case elem if p.test(elem) => creator.create() + }) + + /** + * Recover allows to send last element on failure and gracefully complete the stream + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * Throwing an exception inside `recover` _will_ be logged on ERROR level automatically. + * + * '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + * @since 1.3.0 + */ + def recover(p: function.Predicate[_ >: Throwable], fallbackValue: Out): SubFlow[In, Out, Mat] = + new SubFlow(delegate.recover { + case elem if p.test(elem) => fallbackValue + }) + /** * RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after * a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index 5521a2f157..d26503ed65 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -1247,6 +1247,86 @@ final class SubSource[Out, Mat]( def recover(pf: PartialFunction[Throwable, Out]): SubSource[Out, Mat] = new SubSource(delegate.recover(pf)) + /** + * Recover allows to send last element on failure and gracefully complete the stream + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + * @since 1.3.0 + */ + def recover(clazz: Class[_ <: Throwable], creator: function.Creator[Out]): SubSource[Out, Mat] = + new SubSource(delegate.recover { + case elem if clazz.isInstance(elem) => creator.create() + }) + + /** + * Recover allows to send last element on failure and gracefully complete the stream + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + * @since 1.3.0 + */ + def recover(clazz: Class[_ <: Throwable], fallbackValue: Out): SubSource[Out, Mat] = + new SubSource(delegate.recover { + case elem if clazz.isInstance(elem) => fallbackValue + }) + + /** + * Recover allows to send last element on failure and gracefully complete the stream + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + * @since 1.3.0 + */ + def recover(p: function.Predicate[_ >: Throwable], creator: function.Creator[Out]): SubSource[Out, Mat] = + new SubSource(delegate.recover { + case elem if p.test(elem) => creator.create() + }) + + /** + * Recover allows to send last element on failure and gracefully complete the stream + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + * @since 1.3.0 + */ + def recover(p: function.Predicate[_ >: Throwable], fallbackValue: Out): SubSource[Out, Mat] = + new SubSource(delegate.recover { + case elem if p.test(elem) => fallbackValue + }) + /** * RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after * a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
