This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 77532c1bb5 feat: Add retry with predicate (#1269)
77532c1bb5 is described below

commit 77532c1bb5d4e8078b3e031ecac03697edbc58fc
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Tue Apr 23 20:34:16 2024 +0800

    feat: Add retry with predicate (#1269)
---
 .../scala/org/apache/pekko/pattern/RetrySpec.scala |  34 +++
 .../scala/org/apache/pekko/pattern/Patterns.scala  | 244 ++++++++++++++++++++-
 .../org/apache/pekko/pattern/RetrySupport.scala    | 244 ++++++++++++++++++---
 docs/src/test/java/jdocs/future/FutureDocTest.java |  13 ++
 4 files changed, 499 insertions(+), 36 deletions(-)

diff --git 
a/actor-tests/src/test/scala/org/apache/pekko/pattern/RetrySpec.scala 
b/actor-tests/src/test/scala/org/apache/pekko/pattern/RetrySpec.scala
index 607ed61d65..5aa9e093c3 100644
--- a/actor-tests/src/test/scala/org/apache/pekko/pattern/RetrySpec.scala
+++ b/actor-tests/src/test/scala/org/apache/pekko/pattern/RetrySpec.scala
@@ -22,6 +22,8 @@ import org.apache.pekko
 import pekko.actor.Scheduler
 import pekko.testkit.PekkoSpec
 
+import java.util.concurrent.atomic.AtomicInteger
+
 class RetrySpec extends PekkoSpec with RetrySupport {
   implicit val ec: ExecutionContextExecutor = system.dispatcher
   implicit val scheduler: Scheduler = system.scheduler
@@ -152,6 +154,38 @@ class RetrySpec extends PekkoSpec with RetrySupport {
         Await.result(retried, remaining) should ===(5)
       }
     }
+
+    "be able to retry with predicate for value" in {
+      val counter = new AtomicInteger(0)
+      def attempt(): Future[Int] = {
+        Future.successful(counter.incrementAndGet())
+      }
+
+      val retried = retry(() => attempt(), (t: Int, _) => t < 5, 10, 100 
milliseconds)
+
+      within(3 seconds) {
+        Await.result(retried, remaining) should ===(5)
+      }
+    }
+
+    "be able to skip retry with predicate for exception" in {
+      val counter = new AtomicInteger(0)
+
+      def attempt(): Future[Int] = {
+        counter.incrementAndGet()
+        // should not retry on this exception
+        Future.failed(new IllegalArgumentException())
+      }
+
+      val retried =
+        retry(() => attempt(), (_: Int, e) => 
!e.isInstanceOf[IllegalArgumentException], 10, 100 milliseconds)
+
+      within(3 seconds) {
+        retried.failed.futureValue shouldBe an[IllegalArgumentException]
+        counter.get() should ===(1)
+      }
+    }
+
   }
 
 }
diff --git a/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala 
b/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala
index 09e90526e0..2b8759e026 100644
--- a/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala
+++ b/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala
@@ -15,6 +15,7 @@ package org.apache.pekko.pattern
 
 import java.util.Optional
 import java.util.concurrent.{ Callable, CompletionStage, TimeUnit }
+import java.util.function.BiPredicate
 
 import scala.concurrent.ExecutionContext
 
@@ -491,13 +492,42 @@ object Patterns {
    *
    * If attempts are exhausted the returned completion operator is simply the 
result of invoking attempt.
    * Note that the attempt function will be invoked on the given execution 
context for subsequent tries
-   * and therefore must be thread safe (not touch unsafe mutable state).
+   * and therefore must be thread safe (i.e. not touch unsafe mutable state).
    */
   def retry[T](attempt: Callable[CompletionStage[T]], attempts: Int, ec: 
ExecutionContext): CompletionStage[T] = {
     require(attempt != null, "Parameter attempt should not be null.")
     scalaRetry(() => attempt.call().asScala, attempts)(ec).asJava
   }
 
+  /**
+   * Returns an internally retrying [[java.util.concurrent.CompletionStage]].
+   *
+   * When the future is completed, the `shouldRetry` predicate is always been 
invoked with the result (or `null` if none)
+   * and the exception (or `null` if none). If the `shouldRetry` predicate 
returns true, then a new attempt is made,
+   * each subsequent attempt will be made after the 'delay' return by 
`delayFunction` (the input next attempt count start from 1).
+   * Return an empty [[Optional]] instance for no delay.
+   *
+   * If attempts are exhausted the returned completion operator is simply the 
result of invoking attempt.
+   * Note that the attempt function will be invoked on the given execution 
context for subsequent tries
+   * and therefore must be thread safe (i.e. not touch unsafe mutable state).
+   *
+   * @param attempt       the function to be attempted
+   * @param shouldRetry   the predicate to determine if the attempt should be 
retried
+   * @param attempts      the maximum number of attempts
+   * @param ec       the execution context
+   * @return the result [[java.util.concurrent.CompletionStage]] which maybe 
retried
+   *
+   * @since 1.1.0
+   */
+  def retry[T](
+      attempt: Callable[CompletionStage[T]],
+      shouldRetry: BiPredicate[T, Throwable],
+      attempts: Int,
+      ec: ExecutionContext): CompletionStage[T] = {
+    require(attempt != null, "Parameter attempt should not be null.")
+    scalaRetry(() => attempt.call().asScala, (t, e) => shouldRetry.test(t, e), 
attempts)(ec).asJava
+  }
+
   /**
    * Returns an internally retrying [[java.util.concurrent.CompletionStage]]
    * The first attempt will be made immediately, each subsequent attempt will 
be made with a backoff time,
@@ -505,7 +535,7 @@ object Patterns {
    *
    * If attempts are exhausted the returned future is simply the result of 
invoking attempt.
    * Note that the attempt function will be invoked on the given execution 
context for subsequent tries and
-   * therefore must be thread safe (not touch unsafe mutable state).
+   * therefore must be thread safe (i.e. not touch unsafe mutable state).
    *
    * @param minBackoff   minimum (initial) duration until the child actor will
    *                     started again, if it is terminated
@@ -530,6 +560,50 @@ object Patterns {
       system.classicSystem.scheduler,
       system.classicSystem.dispatcher)
 
+  /**
+   * Returns an internally retrying [[java.util.concurrent.CompletionStage]].
+   *
+   * When the future is completed, the `shouldRetry` predicate is always been 
invoked with the result (or `null` if none)
+   * and the exception (or `null` if none). If the `shouldRetry` predicate 
returns true, then a new attempt is made,
+   * each subsequent attempt will be made after the 'delay' return by 
`delayFunction` (the input next attempt count start from 1).
+   * Return an empty [[Optional]] instance for no delay.
+   *
+   * If attempts are exhausted the returned future is simply the result of 
invoking attempt.
+   * Note that the attempt function will be invoked on the given execution 
context for subsequent tries and
+   * therefore must be thread safe (i.e. not touch unsafe mutable state).
+   *
+   * @param attempt       the function to be attempted
+   * @param shouldRetry   the predicate to determine if the attempt should be 
retried
+   * @param attempts      the maximum number of attempts
+   * @param minBackoff   minimum (initial) duration until the child actor will
+   *                     started again, if it is terminated
+   * @param maxBackoff   the exponential back-off is capped to this duration
+   * @param randomFactor after calculation of the exponential back-off an 
additional
+   *                     random delay based on this factor is added, e.g. 
`0.2` adds up to `20%` delay.
+   *                     In order to skip this additional delay pass in `0`.
+   * @param system      the actor system
+   * @return the result [[java.util.concurrent.CompletionStage]] which maybe 
retried
+   *
+   * @since 1.1.0
+   */
+  def retry[T](
+      attempt: Callable[CompletionStage[T]],
+      shouldRetry: BiPredicate[T, Throwable],
+      attempts: Int,
+      minBackoff: java.time.Duration,
+      maxBackoff: java.time.Duration,
+      randomFactor: Double,
+      system: ClassicActorSystemProvider): CompletionStage[T] =
+    retry(
+      attempt,
+      shouldRetry,
+      attempts,
+      minBackoff,
+      maxBackoff,
+      randomFactor,
+      system.classicSystem.scheduler,
+      system.classicSystem.dispatcher)
+
   /**
    * Returns an internally retrying [[java.util.concurrent.CompletionStage]]
    * The first attempt will be made immediately, each subsequent attempt will 
be made with a backoff time,
@@ -537,7 +611,7 @@ object Patterns {
    *
    * If attempts are exhausted the returned future is simply the result of 
invoking attempt.
    * Note that the attempt function will be invoked on the given execution 
context for subsequent tries and
-   * therefore must be thread safe (not touch unsafe mutable state).
+   * therefore must be thread safe (i.e. not touch unsafe mutable state).
    *
    * @param minBackoff   minimum (initial) duration until the child actor will
    *                     started again, if it is terminated
@@ -562,6 +636,53 @@ object Patterns {
       scheduler).asJava
   }
 
+  /**
+   * Returns an internally retrying [[java.util.concurrent.CompletionStage]].
+   *
+   * When the future is completed, the `shouldRetry` predicate is always been 
invoked with the result (or `null` if none)
+   * and the exception (or `null` if none). If the `shouldRetry` predicate 
returns true, then a new attempt is made,
+   * each subsequent attempt will be made after the 'delay' return by 
`delayFunction` (the input next attempt count start from 1).
+   * Return an empty [[Optional]] instance for no delay.
+   *
+   * If attempts are exhausted the returned future is simply the result of 
invoking attempt.
+   * Note that the attempt function will be invoked on the given execution 
context for subsequent tries and
+   * therefore must be thread safe (i.e. not touch unsafe mutable state).
+   *
+   * @param attempt       the function to be attempted
+   * @param shouldRetry   the predicate to determine if the attempt should be 
retried
+   * @param attempts      the maximum number of attempts
+   * @param minBackoff   minimum (initial) duration until the child actor will
+   *                     started again, if it is terminated
+   * @param maxBackoff   the exponential back-off is capped to this duration
+   * @param randomFactor after calculation of the exponential back-off an 
additional
+   *                     random delay based on this factor is added, e.g. 
`0.2` adds up to `20%` delay.
+   *                     In order to skip this additional delay pass in `0`.
+   * @param scheduler     the scheduler for scheduling a delay
+   * @param ec       the execution context
+   * @return the result [[java.util.concurrent.CompletionStage]] which maybe 
retried
+   *
+   * @since 1.1.0
+   */
+  def retry[T](
+      attempt: Callable[CompletionStage[T]],
+      shouldRetry: BiPredicate[T, Throwable],
+      attempts: Int,
+      minBackoff: java.time.Duration,
+      maxBackoff: java.time.Duration,
+      randomFactor: Double,
+      scheduler: Scheduler,
+      ec: ExecutionContext): CompletionStage[T] = {
+    require(attempt != null, "Parameter attempt should not be null.")
+    require(minBackoff != null, "Parameter minBackoff should not be null.")
+    require(maxBackoff != null, "Parameter minBackoff should not be null.")
+    scalaRetry(
+      () => attempt.call().asScala,
+      (t, e) => shouldRetry.test(t, e),
+      attempts, minBackoff.asScala, maxBackoff.asScala, randomFactor)(
+      ec,
+      scheduler).asJava
+  }
+
   /**
    * Returns an internally retrying [[scala.concurrent.Future]]
    * The first attempt will be made immediately, and each subsequent attempt 
will be made after 'delay'.
@@ -569,7 +690,7 @@ object Patterns {
    *
    * If attempts are exhausted the returned future is simply the result of 
invoking attempt.
    * Note that the attempt function will be invoked on the given execution 
context for subsequent tries and
-   * therefore must be thread safe (not touch unsafe mutable state).
+   * therefore must be thread safe (i.e. not touch unsafe mutable state).
    */
   def retry[T](
       attempt: Callable[Future[T]],
@@ -588,7 +709,7 @@ object Patterns {
    *
    * If attempts are exhausted the returned completion operator is simply the 
result of invoking attempt.
    * Note that the attempt function will be invoked on the given execution 
context for subsequent tries
-   * and therefore must be thread safe (not touch unsafe mutable state).
+   * and therefore must be thread safe (i.e. not touch unsafe mutable state).
    */
   def retry[T](
       attempt: Callable[CompletionStage[T]],
@@ -597,6 +718,35 @@ object Patterns {
       system: ClassicActorSystemProvider): CompletionStage[T] =
     retry(attempt, attempts, delay, system.classicSystem.scheduler, 
system.classicSystem.dispatcher)
 
+  /**
+   * Returns an internally retrying [[java.util.concurrent.CompletionStage]].
+   *
+   * When the future is completed, the `shouldRetry` predicate is always been 
invoked with the result (or `null` if none)
+   * and the exception (or `null` if none). If the `shouldRetry` predicate 
returns true, then a new attempt is made,
+   * each subsequent attempt will be made after the 'delay' return by 
`delayFunction` (the input next attempt count start from 1).
+   * Return an empty [[Optional]] instance for no delay.
+   *
+   * If attempts are exhausted the returned completion operator is simply the 
result of invoking attempt.
+   * Note that the attempt function will be invoked on the given execution 
context for subsequent tries
+   * and therefore must be thread safe (i.e. not touch unsafe mutable state).
+   *
+   * @param attempt       the function to be attempted
+   * @param shouldRetry   the predicate to determine if the attempt should be 
retried
+   * @param attempts      the maximum number of attempts
+   * @param delay         the delay between each attempt
+   * @param system        the actor system
+   * @return the result [[java.util.concurrent.CompletionStage]] which maybe 
retried
+   *
+   * @since 1.1.0
+   */
+  def retry[T](
+      attempt: Callable[CompletionStage[T]],
+      shouldRetry: BiPredicate[T, Throwable],
+      attempts: Int,
+      delay: java.time.Duration,
+      system: ClassicActorSystemProvider): CompletionStage[T] =
+    retry(attempt, shouldRetry, attempts, delay, 
system.classicSystem.scheduler, system.classicSystem.dispatcher)
+
   /**
    * Returns an internally retrying [[java.util.concurrent.CompletionStage]]
    * The first attempt will be made immediately, and each subsequent attempt 
will be made after 'delay'.
@@ -604,7 +754,7 @@ object Patterns {
    *
    * If attempts are exhausted the returned completion operator is simply the 
result of invoking attempt.
    * Note that the attempt function will be invoked on the given execution 
context for subsequent tries
-   * and therefore must be thread safe (not touch unsafe mutable state).
+   * and therefore must be thread safe (i.e. not touch unsafe mutable state).
    */
   def retry[T](
       attempt: Callable[CompletionStage[T]],
@@ -616,21 +766,96 @@ object Patterns {
     scalaRetry(() => attempt.call().asScala, attempts, delay.asScala)(ec, 
scheduler).asJava
   }
 
+  /**
+   * Returns an internally retrying [[java.util.concurrent.CompletionStage]].
+   *
+   * When the future is completed, the `shouldRetry` predicate is always been 
invoked with the result (or `null` if none)
+   * and the exception (or `null` if none). If the `shouldRetry` predicate 
returns true, then a new attempt is made,
+   * each subsequent attempt will be made after the 'delay' return by 
`delayFunction` (the input next attempt count start from 1).
+   * Return an empty [[Optional]] instance for no delay.
+   *
+   * If attempts are exhausted the returned completion operator is simply the 
result of invoking attempt.
+   * Note that the attempt function will be invoked on the given execution 
context for subsequent tries
+   * and therefore must be thread safe (i.e. not touch unsafe mutable state).
+   *
+   * @param attempt       the function to be attempted
+   * @param shouldRetry   the predicate to determine if the attempt should be 
retried
+   * @param attempts      the maximum number of attempts
+   * @param delay         the delay between each attempt
+   * @param scheduler     the scheduler for scheduling a delay
+   * @param ec            the execution context
+   * @return the result [[java.util.concurrent.CompletionStage]] which maybe 
retried   *
+   *
+   * @since 1.1.0
+   */
+  def retry[T](
+      attempt: Callable[CompletionStage[T]],
+      shouldRetry: BiPredicate[T, Throwable],
+      attempts: Int,
+      delay: java.time.Duration,
+      scheduler: Scheduler,
+      ec: ExecutionContext): CompletionStage[T] = {
+    require(attempt != null, "Parameter attempt should not be null.")
+    scalaRetry(() => attempt.call().asScala, (t, e) => shouldRetry.test(t, e), 
attempts, delay.asScala)(ec,
+      scheduler).asJava
+  }
+
   /**
    * Returns an internally retrying [[java.util.concurrent.CompletionStage]].
    * The first attempt will be made immediately, each subsequent attempt will 
be made after
-   * the 'delay' return by `delayFunction`(the input next attempt count start 
from 1).
+   * the 'delay' return by `delayFunction` (the input next attempt count start 
from 1).
+   * Return an empty [[Optional]] instance for no delay.
+   * A scheduler (eg context.system.scheduler) must be provided to delay each 
retry.
+   * You could provide a function to generate the next delay duration after 
first attempt,
+   * this function should never return `null`, otherwise an 
[[java.lang.IllegalArgumentException]] will be through.
+   *
+   * If attempts are exhausted the returned future is simply the result of 
invoking attempt.
+   * Note that the attempt function will be invoked on the given execution 
context for subsequent tries and
+   * therefore must be thread safe (i.e. not touch unsafe mutable state).
+   */
+  def retry[T](
+      attempt: Callable[CompletionStage[T]],
+      attempts: Int,
+      delayFunction: 
java.util.function.IntFunction[Optional[java.time.Duration]],
+      scheduler: Scheduler,
+      context: ExecutionContext): CompletionStage[T] = {
+    import pekko.util.OptionConverters._
+    require(attempt != null, "Parameter attempt should not be null.")
+    scalaRetry(
+      () => attempt.call().asScala,
+      attempts,
+      attempted => 
delayFunction.apply(attempted).toScala.map(_.asScala))(context, 
scheduler).asJava
+  }
+
+  /**
+   * Returns an internally retrying [[java.util.concurrent.CompletionStage]].
+   *
+   * When the future is completed, the `shouldRetry` predicate is always been 
invoked with the result (or `null` if none)
+   * and the exception (or `null` if none). If the `shouldRetry` predicate 
returns true, then a new attempt is made,
+   * each subsequent attempt will be made after the 'delay' return by 
`delayFunction` (the input next attempt count start from 1).
    * Return an empty [[Optional]] instance for no delay.
+   *
    * A scheduler (eg context.system.scheduler) must be provided to delay each 
retry.
    * You could provide a function to generate the next delay duration after 
first attempt,
    * this function should never return `null`, otherwise an 
[[java.lang.IllegalArgumentException]] will be through.
    *
    * If attempts are exhausted the returned future is simply the result of 
invoking attempt.
    * Note that the attempt function will be invoked on the given execution 
context for subsequent tries and
-   * therefore must be thread safe (not touch unsafe mutable state).
+   * therefore must be thread safe (i.e. not touch unsafe mutable state).
+   *
+   * @param attempt       the function to be attempted
+   * @param shouldRetry   the predicate to determine if the attempt should be 
retried
+   * @param attempts      the maximum number of attempts
+   * @param delayFunction the function to generate the next delay duration, 
`None` for no delay
+   * @param scheduler     the scheduler for scheduling a delay
+   * @param context       the execution context
+   * @return the result [[java.util.concurrent.CompletionStage]] which maybe 
retried
+   *
+   * @since 1.1.0
    */
   def retry[T](
       attempt: Callable[CompletionStage[T]],
+      shouldRetry: BiPredicate[T, Throwable],
       attempts: Int,
       delayFunction: 
java.util.function.IntFunction[Optional[java.time.Duration]],
       scheduler: Scheduler,
@@ -639,6 +864,7 @@ object Patterns {
     require(attempt != null, "Parameter attempt should not be null.")
     scalaRetry(
       () => attempt.call().asScala,
+      (t, e) => shouldRetry.test(t, e),
       attempts,
       attempted => 
delayFunction.apply(attempted).toScala.map(_.asScala))(context, 
scheduler).asJava
   }
@@ -1077,7 +1303,7 @@ object PatternsCS {
    * A scheduler (eg context.system.scheduler) must be provided to delay each 
retry
    * If attempts are exhausted the returned completion operator is simply the 
result of invoking attempt.
    * Note that the attempt function will be invoked on the given execution 
context for subsequent tries
-   * and therefore must be thread safe (not touch unsafe mutable state).
+   * and therefore must be thread safe (i.e. not touch unsafe mutable state).
    */
   @deprecated("Use Patterns.retry instead.", since = "Akka 2.5.19")
   def retry[T](
diff --git a/actor/src/main/scala/org/apache/pekko/pattern/RetrySupport.scala 
b/actor/src/main/scala/org/apache/pekko/pattern/RetrySupport.scala
index 95bad97a73..2fccd0912e 100644
--- a/actor/src/main/scala/org/apache/pekko/pattern/RetrySupport.scala
+++ b/actor/src/main/scala/org/apache/pekko/pattern/RetrySupport.scala
@@ -15,6 +15,7 @@ package org.apache.pekko.pattern
 
 import scala.concurrent.{ ExecutionContext, Future }
 import scala.concurrent.duration.{ Duration, FiniteDuration }
+import scala.util.{ Failure, Success }
 import scala.util.control.NonFatal
 
 import org.apache.pekko
@@ -33,7 +34,7 @@ trait RetrySupport {
    *
    * If attempts are exhausted the returned future is simply the result of 
invoking attempt.
    * Note that the attempt function will be invoked on the given execution 
context for subsequent
-   * tries and therefore must be thread safe (not touch unsafe mutable state).
+   * tries and therefore must be thread safe (i.e. not touch unsafe mutable 
state).
    *
    * <b>Example usage:</b>
    *
@@ -46,6 +47,40 @@ trait RetrySupport {
     RetrySupport.retry(attempt, attempts, attempted = 0)
   }
 
+  /**
+   * Given a function from Unit to Future, returns an internally retrying 
Future.
+   *
+   * When the future is completed, the `shouldRetry` predicate is always been 
invoked with the result (or `null` if none)
+   * and the exception (or `null` if none). If the `shouldRetry` predicate 
returns true, then a new attempt is made,
+   * each subsequent attempt will be made after the 'delay' return by 
`delayFunction` (the input next attempt count start from 1).
+   * Returns [[scala.None]] for no delay.
+   *
+   * If attempts are exhausted the returned future is simply the result of 
invoking attempt.
+   * Note that the attempt function will be invoked on the given execution 
context for subsequent
+   * tries and therefore must be thread safe (i.e. not touch unsafe mutable 
state).
+   *
+   * <b>Example usage:</b>
+   *
+   * {{{
+   * def possiblyFailing(): Future[Something] = ???
+   * val shouldRetry: (Something, Throwable) => throwable ne null
+   * val withRetry: Future[Something] = retry(attempt = possiblyFailing, 
shouldRetry, attempts = 10)
+   * }}}
+   *
+   * @param attempt     the function to be attempted
+   * @param shouldRetry the predicate to determine if the attempt should be 
retried
+   * @param attempts    the maximum number of attempts
+   * @param ec          the execution context
+   * @return the result future which maybe retried
+   *
+   * @since 1.1.0
+   */
+  def retry[T](attempt: () => Future[T],
+      shouldRetry: (T, Throwable) => Boolean,
+      attempts: Int)(implicit ec: ExecutionContext): Future[T] = {
+    RetrySupport.retry(attempt, shouldRetry, attempts, 
ConstantFun.scalaAnyToNone, attempted = 0)(ec, null)
+  }
+
   /**
    * Given a function from Unit to Future, returns an internally retrying 
Future.
    * The first attempt will be made immediately, each subsequent attempt will 
be made with a backoff time,
@@ -53,7 +88,7 @@ trait RetrySupport {
    *
    * If attempts are exhausted the returned future is simply the result of 
invoking attempt.
    * Note that the attempt function will be invoked on the given execution 
context for subsequent
-   * tries and therefore must be thread safe (not touch unsafe mutable state).
+   * tries and therefore must be thread safe (i.e. not touch unsafe mutable 
state).
    *
    * <b>Example usage:</b>
    *
@@ -81,7 +116,64 @@ trait RetrySupport {
       minBackoff: FiniteDuration,
       maxBackoff: FiniteDuration,
       randomFactor: Double)(implicit ec: ExecutionContext, scheduler: 
Scheduler): Future[T] = {
-    require(attempt != null, "Parameter attempt should not be null.")
+    retry(
+      attempt,
+      RetrySupport.retryOnException,
+      attempts,
+      minBackoff,
+      maxBackoff,
+      randomFactor)
+  }
+
+  /**
+   * Given a function from Unit to Future, returns an internally retrying 
Future.
+   *
+   * When the future is completed, the `shouldRetry` predicate is always been 
invoked with the result (or `null` if none)
+   * and the exception (or `null` if none). If the `shouldRetry` predicate 
returns true, then a new attempt is made,
+   * each subsequent attempt will be made after the 'delay' return by 
`delayFunction` (the input next attempt count start from 1).
+   * Returns [[scala.None]] for no delay.
+   *
+   * If attempts are exhausted the returned future is simply the result of 
invoking attempt.
+   * Note that the attempt function will be invoked on the given execution 
context for subsequent
+   * tries and therefore must be thread safe (i.e. not touch unsafe mutable 
state).
+   *
+   * <b>Example usage:</b>
+   *
+   * {{{
+   * protected val sendAndReceive: HttpRequest => Future[HttpResponse]
+   * protected val shouldRetry: (HttpResponse, Throwable) => throwable ne null
+   * private val sendReceiveRetry: HttpRequest => Future[HttpResponse] = (req: 
HttpRequest) => retry[HttpResponse](
+   *   attempt = () => sendAndReceive(req),
+   *   shouldRetry,
+   *   attempts = 10,
+   *   minBackoff = 1.seconds,
+   *   maxBackoff = 2.seconds,
+   *   randomFactor = 0.5
+   * )
+   * }}}
+   *
+   * @param attempt     the function to be attempted
+   * @param shouldRetry the predicate to determine if the attempt should be 
retried
+   * @param attempts    the maximum number of attempts
+   * @param minBackoff   minimum (initial) duration until the child actor will
+   *                     started again, if it is terminated
+   * @param maxBackoff   the exponential back-off is capped to this duration
+   * @param randomFactor after calculation of the exponential back-off an 
additional
+   *                     random delay based on this factor is added, e.g. 
`0.2` adds up to `20%` delay.
+   *                     In order to skip this additional delay pass in `0`.
+   * @param ec          the execution context
+   * @param scheduler   the scheduler for scheduling a delay
+   * @return the result future which maybe retried
+   *
+   * @since 1.1.0
+   */
+  def retry[T](
+      attempt: () => Future[T],
+      shouldRetry: (T, Throwable) => Boolean,
+      attempts: Int,
+      minBackoff: FiniteDuration,
+      maxBackoff: FiniteDuration,
+      randomFactor: Double)(implicit ec: ExecutionContext, scheduler: 
Scheduler): Future[T] = {
     require(minBackoff != null, "Parameter minBackoff should not be null.")
     require(maxBackoff != null, "Parameter maxBackoff should not be null.")
     require(minBackoff > Duration.Zero, "Parameter minBackoff must be > 0")
@@ -89,6 +181,7 @@ trait RetrySupport {
     require(0.0 <= randomFactor && randomFactor <= 1.0, "randomFactor must be 
between 0.0 and 1.0")
     retry(
       attempt,
+      shouldRetry,
       attempts,
       attempted => Some(BackoffSupervisor.calculateDelay(attempted, 
minBackoff, maxBackoff, randomFactor)))
   }
@@ -100,7 +193,7 @@ trait RetrySupport {
    *
    * If attempts are exhausted the returned future is simply the result of 
invoking attempt.
    * Note that the attempt function will be invoked on the given execution 
context for subsequent
-   * tries and therefore must be thread safe (not touch unsafe mutable state).
+   * tries and therefore must be thread safe (i.e. not touch unsafe mutable 
state).
    *
    * <b>Example usage:</b>
    *
@@ -119,18 +212,63 @@ trait RetrySupport {
     retry(attempt, attempts, _ => Some(delay))
   }
 
+  /**
+   * Given a function from Unit to Future, returns an internally retrying 
Future.
+   *
+   * When the future is completed, the `shouldRetry` predicate is always been 
invoked with the result (or `null` if none)
+   * and the exception (or `null` if none). If the `shouldRetry` predicate 
returns true, then a new attempt is made,
+   * each subsequent attempt will be made after the 'delay' return by 
`delayFunction` (the input next attempt count start from 1).
+   * Returns [[scala.None]] for no delay.
+   *
+   * If attempts are exhausted the returned future is simply the result of 
invoking attempt.
+   * Note that the attempt function will be invoked on the given execution 
context for subsequent
+   * tries and therefore must be thread safe (i.e. not touch unsafe mutable 
state).
+   *
+   * <b>Example usage:</b>
+   *
+   * {{{
+   * protected val sendAndReceive: HttpRequest => Future[HttpResponse]
+   * protected val shouldRetry: (HttpResponse, Throwable) => throwable ne null
+   * private val sendReceiveRetry: HttpRequest => Future[HttpResponse] = (req: 
HttpRequest) => retry[HttpResponse](
+   *   attempt = () => sendAndReceive(req),
+   *   shouldRetry,
+   *   attempts = 10,
+   *   delay = 2.seconds
+   * )
+   * }}}
+   *
+   * @param attempt     the function to be attempted
+   * @param shouldRetry the predicate to determine if the attempt should be 
retried
+   * @param attempts    the maximum number of attempts
+   * @param delay       the delay duration
+   * @param ec          the execution context
+   * @param scheduler   the scheduler for scheduling a delay
+   * @return the result future which maybe retried
+   *
+   * @since 1.1.0
+   */
+  def retry[T](attempt: () => Future[T],
+      shouldRetry: (T, Throwable) => Boolean,
+      attempts: Int,
+      delay: FiniteDuration)(
+      implicit ec: ExecutionContext,
+      scheduler: Scheduler): Future[T] = {
+    retry(attempt, shouldRetry, attempts, _ => Some(delay))
+  }
+
   /**
    * Given a function from Unit to Future, returns an internally retrying 
Future.
    * The first attempt will be made immediately, each subsequent attempt will 
be made after
-   * the 'delay' return by `delayFunction`(the input next attempt count start 
from 1).
+   * the 'delay' return by `delayFunction` (the input next attempt count start 
from 1).
    * Returns [[scala.None]] for no delay.
+   *
    * A scheduler (eg context.system.scheduler) must be provided to delay each 
retry.
    * You could provide a function to generate the next delay duration after 
first attempt,
    * this function should never return `null`, otherwise an 
[[java.lang.IllegalArgumentException]] will be through.
    *
    * If attempts are exhausted the returned future is simply the result of 
invoking attempt.
    * Note that the attempt function will be invoked on the given execution 
context for subsequent
-   * tries and therefore must be thread safe (not touch unsafe mutable state).
+   * tries and therefore must be thread safe (i.e. not touch unsafe mutable 
state).
    *
    * <b>Example usage:</b>
    *
@@ -148,22 +286,70 @@ trait RetrySupport {
       implicit
       ec: ExecutionContext,
       scheduler: Scheduler): Future[T] = {
-    RetrySupport.retry(attempt, attempts, delayFunction, attempted = 0)
+    RetrySupport.retry(attempt, RetrySupport.retryOnException, attempts, 
delayFunction, attempted = 0)
+  }
+
+  /**
+   * Given a function from Unit to Future, returns an internally retrying 
Future.
+   *
+   * When the future is completed, the `shouldRetry` predicate is always been 
invoked with the result (or `null` if none)
+   * and the exception (or `null` if none). If the `shouldRetry` predicate 
returns true, then a new attempt is made,
+   * each subsequent attempt will be made after the 'delay' return by 
`delayFunction` (the input next attempt count start from 1).
+   * Returns [[scala.None]] for no delay.
+   *
+   * A scheduler (eg context.system.scheduler) must be provided to delay each 
retry.
+   * You could provide a function to generate the next delay duration after 
first attempt,
+   * this function should never return `null`, otherwise an 
[[java.lang.IllegalArgumentException]] will be through.
+   *
+   * If attempts are exhausted the returned future is simply the result of 
invoking attempt.
+   * Note that the attempt function will be invoked on the given execution 
context for subsequent
+   * tries and therefore must be thread safe (i.e. not touch unsafe mutable 
state).
+   *
+   * <b>Example usage:</b>
+   *
+   * //retry with back off
+   * {{{
+   * protected val sendAndReceive: HttpRequest => Future[HttpResponse]
+   * protected val shouldRetry: (HttpResponse, Throwable) => throwable ne null
+   * private val sendReceiveRetry: HttpRequest => Future[HttpResponse] = (req: 
HttpRequest) => retry[HttpResponse](
+   *   attempt = () => sendAndReceive(req),
+   *   shouldRetry,
+   *   attempts = 10,
+   *   delayFunction = attempted => Option(2.seconds * attempted)
+   * )
+   * }}}
+   *
+   * @param attempt     the function to be attempted
+   * @param shouldRetry the predicate to determine if the attempt should be 
retried
+   * @param attempts    the maximum number of attempts
+   * @param delayFunction the function to generate the next delay duration, 
`None` for no delay
+   * @param ec          the execution context
+   * @param scheduler   the scheduler for scheduling a delay
+   * @return the result future which maybe retried
+   *
+   * @since 1.1.0
+   */
+  def retry[T](attempt: () => Future[T],
+      shouldRetry: (T, Throwable) => Boolean,
+      attempts: Int,
+      delayFunction: Int => Option[FiniteDuration])(implicit ec: 
ExecutionContext, scheduler: Scheduler): Future[T] = {
+    RetrySupport.retry(attempt, shouldRetry, attempts, delayFunction, 
attempted = 0)
   }
 }
 
 object RetrySupport extends RetrySupport {
+  private val retryOnException: (Any, Throwable) => Boolean = (_: Any, e: 
Throwable) => e != null
 
   private def retry[T](attempt: () => Future[T], maxAttempts: Int, attempted: 
Int)(
       implicit ec: ExecutionContext): Future[T] =
-    retry(attempt, maxAttempts, ConstantFun.scalaAnyToNone, attempted)(ec, 
null)
+    retry(attempt, retryOnException, maxAttempts, ConstantFun.scalaAnyToNone, 
attempted)(ec, null)
 
   private def retry[T](
       attempt: () => Future[T],
+      shouldRetry: (T, Throwable) => Boolean,
       maxAttempts: Int,
       delayFunction: Int => Option[FiniteDuration],
       attempted: Int)(implicit ec: ExecutionContext, scheduler: Scheduler): 
Future[T] = {
-
     def tryAttempt(): Future[T] = {
       try {
         attempt()
@@ -172,30 +358,34 @@ object RetrySupport extends RetrySupport {
       }
     }
 
-    require(maxAttempts >= 0, "Parameter maxAttempts must >= 0.")
+    def doRetry(nextAttempt: Int): Future[T] = delayFunction(nextAttempt) 
match {
+      case Some(delay) =>
+        if (delay.length < 1)
+          retry(attempt, shouldRetry, maxAttempts, delayFunction, nextAttempt)
+        else
+          after(delay, scheduler) {
+            retry(attempt, shouldRetry, maxAttempts, delayFunction, 
nextAttempt)
+          }
+      case None =>
+        retry(attempt, shouldRetry, maxAttempts, delayFunction, nextAttempt)
+      case null =>
+        Future.failed(new IllegalArgumentException("The delayFunction of retry 
should not return null."))
+    }
+
     require(attempt != null, "Parameter attempt should not be null.")
+    require(maxAttempts >= 0, "Parameter maxAttempts must >= 0.")
+    require(delayFunction != null, "Parameter delayFunction should not be 
null.")
+    require(attempted >= 0, "Parameter attempted must >= 0.")
+
     if (maxAttempts - attempted > 0) {
       val result = tryAttempt()
       if (result eq null)
         result
       else {
-        val nextAttempt = attempted + 1
-        result.recoverWith {
-          case NonFatal(_) =>
-            delayFunction(nextAttempt) match {
-              case Some(delay) =>
-                if (delay.length < 1)
-                  retry(attempt, maxAttempts, delayFunction, nextAttempt)
-                else
-                  after(delay, scheduler) {
-                    retry(attempt, maxAttempts, delayFunction, nextAttempt)
-                  }
-              case None =>
-                retry(attempt, maxAttempts, delayFunction, nextAttempt)
-              case null =>
-                Future.failed(new IllegalArgumentException("The delayFunction 
of retry should not return null."))
-            }
-
+        result.transformWith {
+          case Success(value) if shouldRetry(value, null)                      
  => doRetry(attempted + 1)
+          case Failure(e) if NonFatal(e) && shouldRetry(null.asInstanceOf[T], 
e) => doRetry(attempted + 1)
+          case _                                                               
  => result
         }
       }
 
diff --git a/docs/src/test/java/jdocs/future/FutureDocTest.java 
b/docs/src/test/java/jdocs/future/FutureDocTest.java
index 861ebd9fae..5627b1d4e1 100644
--- a/docs/src/test/java/jdocs/future/FutureDocTest.java
+++ b/docs/src/test/java/jdocs/future/FutureDocTest.java
@@ -85,4 +85,17 @@ public class FutureDocTest extends AbstractJavaTest {
 
     retriedFuture.toCompletableFuture().get(2, SECONDS);
   }
+
+  @Test
+  public void useRetryWithPredicate() throws Exception {
+    // #retry
+    Callable<CompletionStage<String>> attempt = () -> 
CompletableFuture.completedFuture("test");
+
+    CompletionStage<String> retriedFuture =
+        Patterns.retry(
+            attempt, (notUsed, e) -> e != null, 3, 
java.time.Duration.ofMillis(200), system);
+    // #retry
+
+    retriedFuture.toCompletableFuture().get(2, SECONDS);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to