This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 77532c1bb5 feat: Add retry with predicate (#1269)
77532c1bb5 is described below
commit 77532c1bb5d4e8078b3e031ecac03697edbc58fc
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Tue Apr 23 20:34:16 2024 +0800
feat: Add retry with predicate (#1269)
---
.../scala/org/apache/pekko/pattern/RetrySpec.scala | 34 +++
.../scala/org/apache/pekko/pattern/Patterns.scala | 244 ++++++++++++++++++++-
.../org/apache/pekko/pattern/RetrySupport.scala | 244 ++++++++++++++++++---
docs/src/test/java/jdocs/future/FutureDocTest.java | 13 ++
4 files changed, 499 insertions(+), 36 deletions(-)
diff --git
a/actor-tests/src/test/scala/org/apache/pekko/pattern/RetrySpec.scala
b/actor-tests/src/test/scala/org/apache/pekko/pattern/RetrySpec.scala
index 607ed61d65..5aa9e093c3 100644
--- a/actor-tests/src/test/scala/org/apache/pekko/pattern/RetrySpec.scala
+++ b/actor-tests/src/test/scala/org/apache/pekko/pattern/RetrySpec.scala
@@ -22,6 +22,8 @@ import org.apache.pekko
import pekko.actor.Scheduler
import pekko.testkit.PekkoSpec
+import java.util.concurrent.atomic.AtomicInteger
+
class RetrySpec extends PekkoSpec with RetrySupport {
implicit val ec: ExecutionContextExecutor = system.dispatcher
implicit val scheduler: Scheduler = system.scheduler
@@ -152,6 +154,38 @@ class RetrySpec extends PekkoSpec with RetrySupport {
Await.result(retried, remaining) should ===(5)
}
}
+
+ "be able to retry with predicate for value" in {
+ val counter = new AtomicInteger(0)
+ def attempt(): Future[Int] = {
+ Future.successful(counter.incrementAndGet())
+ }
+
+ val retried = retry(() => attempt(), (t: Int, _) => t < 5, 10, 100
milliseconds)
+
+ within(3 seconds) {
+ Await.result(retried, remaining) should ===(5)
+ }
+ }
+
+ "be able to skip retry with predicate for exception" in {
+ val counter = new AtomicInteger(0)
+
+ def attempt(): Future[Int] = {
+ counter.incrementAndGet()
+ // should not retry on this exception
+ Future.failed(new IllegalArgumentException())
+ }
+
+ val retried =
+ retry(() => attempt(), (_: Int, e) =>
!e.isInstanceOf[IllegalArgumentException], 10, 100 milliseconds)
+
+ within(3 seconds) {
+ retried.failed.futureValue shouldBe an[IllegalArgumentException]
+ counter.get() should ===(1)
+ }
+ }
+
}
}
diff --git a/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala
b/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala
index 09e90526e0..2b8759e026 100644
--- a/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala
+++ b/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala
@@ -15,6 +15,7 @@ package org.apache.pekko.pattern
import java.util.Optional
import java.util.concurrent.{ Callable, CompletionStage, TimeUnit }
+import java.util.function.BiPredicate
import scala.concurrent.ExecutionContext
@@ -491,13 +492,42 @@ object Patterns {
*
* If attempts are exhausted the returned completion operator is simply the
result of invoking attempt.
* Note that the attempt function will be invoked on the given execution
context for subsequent tries
- * and therefore must be thread safe (not touch unsafe mutable state).
+ * and therefore must be thread safe (i.e. not touch unsafe mutable state).
*/
def retry[T](attempt: Callable[CompletionStage[T]], attempts: Int, ec:
ExecutionContext): CompletionStage[T] = {
require(attempt != null, "Parameter attempt should not be null.")
scalaRetry(() => attempt.call().asScala, attempts)(ec).asJava
}
+ /**
+ * Returns an internally retrying [[java.util.concurrent.CompletionStage]].
+ *
+ * When the future is completed, the `shouldRetry` predicate is always been
invoked with the result (or `null` if none)
+ * and the exception (or `null` if none). If the `shouldRetry` predicate
returns true, then a new attempt is made,
+ * each subsequent attempt will be made after the 'delay' return by
`delayFunction` (the input next attempt count start from 1).
+ * Return an empty [[Optional]] instance for no delay.
+ *
+ * If attempts are exhausted the returned completion operator is simply the
result of invoking attempt.
+ * Note that the attempt function will be invoked on the given execution
context for subsequent tries
+ * and therefore must be thread safe (i.e. not touch unsafe mutable state).
+ *
+ * @param attempt the function to be attempted
+ * @param shouldRetry the predicate to determine if the attempt should be
retried
+ * @param attempts the maximum number of attempts
+ * @param ec the execution context
+ * @return the result [[java.util.concurrent.CompletionStage]] which maybe
retried
+ *
+ * @since 1.1.0
+ */
+ def retry[T](
+ attempt: Callable[CompletionStage[T]],
+ shouldRetry: BiPredicate[T, Throwable],
+ attempts: Int,
+ ec: ExecutionContext): CompletionStage[T] = {
+ require(attempt != null, "Parameter attempt should not be null.")
+ scalaRetry(() => attempt.call().asScala, (t, e) => shouldRetry.test(t, e),
attempts)(ec).asJava
+ }
+
/**
* Returns an internally retrying [[java.util.concurrent.CompletionStage]]
* The first attempt will be made immediately, each subsequent attempt will
be made with a backoff time,
@@ -505,7 +535,7 @@ object Patterns {
*
* If attempts are exhausted the returned future is simply the result of
invoking attempt.
* Note that the attempt function will be invoked on the given execution
context for subsequent tries and
- * therefore must be thread safe (not touch unsafe mutable state).
+ * therefore must be thread safe (i.e. not touch unsafe mutable state).
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
@@ -530,6 +560,50 @@ object Patterns {
system.classicSystem.scheduler,
system.classicSystem.dispatcher)
+ /**
+ * Returns an internally retrying [[java.util.concurrent.CompletionStage]].
+ *
+ * When the future is completed, the `shouldRetry` predicate is always been
invoked with the result (or `null` if none)
+ * and the exception (or `null` if none). If the `shouldRetry` predicate
returns true, then a new attempt is made,
+ * each subsequent attempt will be made after the 'delay' return by
`delayFunction` (the input next attempt count start from 1).
+ * Return an empty [[Optional]] instance for no delay.
+ *
+ * If attempts are exhausted the returned future is simply the result of
invoking attempt.
+ * Note that the attempt function will be invoked on the given execution
context for subsequent tries and
+ * therefore must be thread safe (i.e. not touch unsafe mutable state).
+ *
+ * @param attempt the function to be attempted
+ * @param shouldRetry the predicate to determine if the attempt should be
retried
+ * @param attempts the maximum number of attempts
+ * @param minBackoff minimum (initial) duration until the child actor will
+ * started again, if it is terminated
+ * @param maxBackoff the exponential back-off is capped to this duration
+ * @param randomFactor after calculation of the exponential back-off an
additional
+ * random delay based on this factor is added, e.g.
`0.2` adds up to `20%` delay.
+ * In order to skip this additional delay pass in `0`.
+ * @param system the actor system
+ * @return the result [[java.util.concurrent.CompletionStage]] which maybe
retried
+ *
+ * @since 1.1.0
+ */
+ def retry[T](
+ attempt: Callable[CompletionStage[T]],
+ shouldRetry: BiPredicate[T, Throwable],
+ attempts: Int,
+ minBackoff: java.time.Duration,
+ maxBackoff: java.time.Duration,
+ randomFactor: Double,
+ system: ClassicActorSystemProvider): CompletionStage[T] =
+ retry(
+ attempt,
+ shouldRetry,
+ attempts,
+ minBackoff,
+ maxBackoff,
+ randomFactor,
+ system.classicSystem.scheduler,
+ system.classicSystem.dispatcher)
+
/**
* Returns an internally retrying [[java.util.concurrent.CompletionStage]]
* The first attempt will be made immediately, each subsequent attempt will
be made with a backoff time,
@@ -537,7 +611,7 @@ object Patterns {
*
* If attempts are exhausted the returned future is simply the result of
invoking attempt.
* Note that the attempt function will be invoked on the given execution
context for subsequent tries and
- * therefore must be thread safe (not touch unsafe mutable state).
+ * therefore must be thread safe (i.e. not touch unsafe mutable state).
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
@@ -562,6 +636,53 @@ object Patterns {
scheduler).asJava
}
+ /**
+ * Returns an internally retrying [[java.util.concurrent.CompletionStage]].
+ *
+ * When the future is completed, the `shouldRetry` predicate is always been
invoked with the result (or `null` if none)
+ * and the exception (or `null` if none). If the `shouldRetry` predicate
returns true, then a new attempt is made,
+ * each subsequent attempt will be made after the 'delay' return by
`delayFunction` (the input next attempt count start from 1).
+ * Return an empty [[Optional]] instance for no delay.
+ *
+ * If attempts are exhausted the returned future is simply the result of
invoking attempt.
+ * Note that the attempt function will be invoked on the given execution
context for subsequent tries and
+ * therefore must be thread safe (i.e. not touch unsafe mutable state).
+ *
+ * @param attempt the function to be attempted
+ * @param shouldRetry the predicate to determine if the attempt should be
retried
+ * @param attempts the maximum number of attempts
+ * @param minBackoff minimum (initial) duration until the child actor will
+ * started again, if it is terminated
+ * @param maxBackoff the exponential back-off is capped to this duration
+ * @param randomFactor after calculation of the exponential back-off an
additional
+ * random delay based on this factor is added, e.g.
`0.2` adds up to `20%` delay.
+ * In order to skip this additional delay pass in `0`.
+ * @param scheduler the scheduler for scheduling a delay
+ * @param ec the execution context
+ * @return the result [[java.util.concurrent.CompletionStage]] which maybe
retried
+ *
+ * @since 1.1.0
+ */
+ def retry[T](
+ attempt: Callable[CompletionStage[T]],
+ shouldRetry: BiPredicate[T, Throwable],
+ attempts: Int,
+ minBackoff: java.time.Duration,
+ maxBackoff: java.time.Duration,
+ randomFactor: Double,
+ scheduler: Scheduler,
+ ec: ExecutionContext): CompletionStage[T] = {
+ require(attempt != null, "Parameter attempt should not be null.")
+ require(minBackoff != null, "Parameter minBackoff should not be null.")
+ require(maxBackoff != null, "Parameter minBackoff should not be null.")
+ scalaRetry(
+ () => attempt.call().asScala,
+ (t, e) => shouldRetry.test(t, e),
+ attempts, minBackoff.asScala, maxBackoff.asScala, randomFactor)(
+ ec,
+ scheduler).asJava
+ }
+
/**
* Returns an internally retrying [[scala.concurrent.Future]]
* The first attempt will be made immediately, and each subsequent attempt
will be made after 'delay'.
@@ -569,7 +690,7 @@ object Patterns {
*
* If attempts are exhausted the returned future is simply the result of
invoking attempt.
* Note that the attempt function will be invoked on the given execution
context for subsequent tries and
- * therefore must be thread safe (not touch unsafe mutable state).
+ * therefore must be thread safe (i.e. not touch unsafe mutable state).
*/
def retry[T](
attempt: Callable[Future[T]],
@@ -588,7 +709,7 @@ object Patterns {
*
* If attempts are exhausted the returned completion operator is simply the
result of invoking attempt.
* Note that the attempt function will be invoked on the given execution
context for subsequent tries
- * and therefore must be thread safe (not touch unsafe mutable state).
+ * and therefore must be thread safe (i.e. not touch unsafe mutable state).
*/
def retry[T](
attempt: Callable[CompletionStage[T]],
@@ -597,6 +718,35 @@ object Patterns {
system: ClassicActorSystemProvider): CompletionStage[T] =
retry(attempt, attempts, delay, system.classicSystem.scheduler,
system.classicSystem.dispatcher)
+ /**
+ * Returns an internally retrying [[java.util.concurrent.CompletionStage]].
+ *
+ * When the future is completed, the `shouldRetry` predicate is always been
invoked with the result (or `null` if none)
+ * and the exception (or `null` if none). If the `shouldRetry` predicate
returns true, then a new attempt is made,
+ * each subsequent attempt will be made after the 'delay' return by
`delayFunction` (the input next attempt count start from 1).
+ * Return an empty [[Optional]] instance for no delay.
+ *
+ * If attempts are exhausted the returned completion operator is simply the
result of invoking attempt.
+ * Note that the attempt function will be invoked on the given execution
context for subsequent tries
+ * and therefore must be thread safe (i.e. not touch unsafe mutable state).
+ *
+ * @param attempt the function to be attempted
+ * @param shouldRetry the predicate to determine if the attempt should be
retried
+ * @param attempts the maximum number of attempts
+ * @param delay the delay between each attempt
+ * @param system the actor system
+ * @return the result [[java.util.concurrent.CompletionStage]] which maybe
retried
+ *
+ * @since 1.1.0
+ */
+ def retry[T](
+ attempt: Callable[CompletionStage[T]],
+ shouldRetry: BiPredicate[T, Throwable],
+ attempts: Int,
+ delay: java.time.Duration,
+ system: ClassicActorSystemProvider): CompletionStage[T] =
+ retry(attempt, shouldRetry, attempts, delay,
system.classicSystem.scheduler, system.classicSystem.dispatcher)
+
/**
* Returns an internally retrying [[java.util.concurrent.CompletionStage]]
* The first attempt will be made immediately, and each subsequent attempt
will be made after 'delay'.
@@ -604,7 +754,7 @@ object Patterns {
*
* If attempts are exhausted the returned completion operator is simply the
result of invoking attempt.
* Note that the attempt function will be invoked on the given execution
context for subsequent tries
- * and therefore must be thread safe (not touch unsafe mutable state).
+ * and therefore must be thread safe (i.e. not touch unsafe mutable state).
*/
def retry[T](
attempt: Callable[CompletionStage[T]],
@@ -616,21 +766,96 @@ object Patterns {
scalaRetry(() => attempt.call().asScala, attempts, delay.asScala)(ec,
scheduler).asJava
}
+ /**
+ * Returns an internally retrying [[java.util.concurrent.CompletionStage]].
+ *
+ * When the future is completed, the `shouldRetry` predicate is always been
invoked with the result (or `null` if none)
+ * and the exception (or `null` if none). If the `shouldRetry` predicate
returns true, then a new attempt is made,
+ * each subsequent attempt will be made after the 'delay' return by
`delayFunction` (the input next attempt count start from 1).
+ * Return an empty [[Optional]] instance for no delay.
+ *
+ * If attempts are exhausted the returned completion operator is simply the
result of invoking attempt.
+ * Note that the attempt function will be invoked on the given execution
context for subsequent tries
+ * and therefore must be thread safe (i.e. not touch unsafe mutable state).
+ *
+ * @param attempt the function to be attempted
+ * @param shouldRetry the predicate to determine if the attempt should be
retried
+ * @param attempts the maximum number of attempts
+ * @param delay the delay between each attempt
+ * @param scheduler the scheduler for scheduling a delay
+ * @param ec the execution context
+ * @return the result [[java.util.concurrent.CompletionStage]] which maybe
retried *
+ *
+ * @since 1.1.0
+ */
+ def retry[T](
+ attempt: Callable[CompletionStage[T]],
+ shouldRetry: BiPredicate[T, Throwable],
+ attempts: Int,
+ delay: java.time.Duration,
+ scheduler: Scheduler,
+ ec: ExecutionContext): CompletionStage[T] = {
+ require(attempt != null, "Parameter attempt should not be null.")
+ scalaRetry(() => attempt.call().asScala, (t, e) => shouldRetry.test(t, e),
attempts, delay.asScala)(ec,
+ scheduler).asJava
+ }
+
/**
* Returns an internally retrying [[java.util.concurrent.CompletionStage]].
* The first attempt will be made immediately, each subsequent attempt will
be made after
- * the 'delay' return by `delayFunction`(the input next attempt count start
from 1).
+ * the 'delay' return by `delayFunction` (the input next attempt count start
from 1).
+ * Return an empty [[Optional]] instance for no delay.
+ * A scheduler (eg context.system.scheduler) must be provided to delay each
retry.
+ * You could provide a function to generate the next delay duration after
first attempt,
+ * this function should never return `null`, otherwise an
[[java.lang.IllegalArgumentException]] will be through.
+ *
+ * If attempts are exhausted the returned future is simply the result of
invoking attempt.
+ * Note that the attempt function will be invoked on the given execution
context for subsequent tries and
+ * therefore must be thread safe (i.e. not touch unsafe mutable state).
+ */
+ def retry[T](
+ attempt: Callable[CompletionStage[T]],
+ attempts: Int,
+ delayFunction:
java.util.function.IntFunction[Optional[java.time.Duration]],
+ scheduler: Scheduler,
+ context: ExecutionContext): CompletionStage[T] = {
+ import pekko.util.OptionConverters._
+ require(attempt != null, "Parameter attempt should not be null.")
+ scalaRetry(
+ () => attempt.call().asScala,
+ attempts,
+ attempted =>
delayFunction.apply(attempted).toScala.map(_.asScala))(context,
scheduler).asJava
+ }
+
+ /**
+ * Returns an internally retrying [[java.util.concurrent.CompletionStage]].
+ *
+ * When the future is completed, the `shouldRetry` predicate is always been
invoked with the result (or `null` if none)
+ * and the exception (or `null` if none). If the `shouldRetry` predicate
returns true, then a new attempt is made,
+ * each subsequent attempt will be made after the 'delay' return by
`delayFunction` (the input next attempt count start from 1).
* Return an empty [[Optional]] instance for no delay.
+ *
* A scheduler (eg context.system.scheduler) must be provided to delay each
retry.
* You could provide a function to generate the next delay duration after
first attempt,
* this function should never return `null`, otherwise an
[[java.lang.IllegalArgumentException]] will be through.
*
* If attempts are exhausted the returned future is simply the result of
invoking attempt.
* Note that the attempt function will be invoked on the given execution
context for subsequent tries and
- * therefore must be thread safe (not touch unsafe mutable state).
+ * therefore must be thread safe (i.e. not touch unsafe mutable state).
+ *
+ * @param attempt the function to be attempted
+ * @param shouldRetry the predicate to determine if the attempt should be
retried
+ * @param attempts the maximum number of attempts
+ * @param delayFunction the function to generate the next delay duration,
`None` for no delay
+ * @param scheduler the scheduler for scheduling a delay
+ * @param context the execution context
+ * @return the result [[java.util.concurrent.CompletionStage]] which maybe
retried
+ *
+ * @since 1.1.0
*/
def retry[T](
attempt: Callable[CompletionStage[T]],
+ shouldRetry: BiPredicate[T, Throwable],
attempts: Int,
delayFunction:
java.util.function.IntFunction[Optional[java.time.Duration]],
scheduler: Scheduler,
@@ -639,6 +864,7 @@ object Patterns {
require(attempt != null, "Parameter attempt should not be null.")
scalaRetry(
() => attempt.call().asScala,
+ (t, e) => shouldRetry.test(t, e),
attempts,
attempted =>
delayFunction.apply(attempted).toScala.map(_.asScala))(context,
scheduler).asJava
}
@@ -1077,7 +1303,7 @@ object PatternsCS {
* A scheduler (eg context.system.scheduler) must be provided to delay each
retry
* If attempts are exhausted the returned completion operator is simply the
result of invoking attempt.
* Note that the attempt function will be invoked on the given execution
context for subsequent tries
- * and therefore must be thread safe (not touch unsafe mutable state).
+ * and therefore must be thread safe (i.e. not touch unsafe mutable state).
*/
@deprecated("Use Patterns.retry instead.", since = "Akka 2.5.19")
def retry[T](
diff --git a/actor/src/main/scala/org/apache/pekko/pattern/RetrySupport.scala
b/actor/src/main/scala/org/apache/pekko/pattern/RetrySupport.scala
index 95bad97a73..2fccd0912e 100644
--- a/actor/src/main/scala/org/apache/pekko/pattern/RetrySupport.scala
+++ b/actor/src/main/scala/org/apache/pekko/pattern/RetrySupport.scala
@@ -15,6 +15,7 @@ package org.apache.pekko.pattern
import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration.{ Duration, FiniteDuration }
+import scala.util.{ Failure, Success }
import scala.util.control.NonFatal
import org.apache.pekko
@@ -33,7 +34,7 @@ trait RetrySupport {
*
* If attempts are exhausted the returned future is simply the result of
invoking attempt.
* Note that the attempt function will be invoked on the given execution
context for subsequent
- * tries and therefore must be thread safe (not touch unsafe mutable state).
+ * tries and therefore must be thread safe (i.e. not touch unsafe mutable
state).
*
* <b>Example usage:</b>
*
@@ -46,6 +47,40 @@ trait RetrySupport {
RetrySupport.retry(attempt, attempts, attempted = 0)
}
+ /**
+ * Given a function from Unit to Future, returns an internally retrying
Future.
+ *
+ * When the future is completed, the `shouldRetry` predicate is always been
invoked with the result (or `null` if none)
+ * and the exception (or `null` if none). If the `shouldRetry` predicate
returns true, then a new attempt is made,
+ * each subsequent attempt will be made after the 'delay' return by
`delayFunction` (the input next attempt count start from 1).
+ * Returns [[scala.None]] for no delay.
+ *
+ * If attempts are exhausted the returned future is simply the result of
invoking attempt.
+ * Note that the attempt function will be invoked on the given execution
context for subsequent
+ * tries and therefore must be thread safe (i.e. not touch unsafe mutable
state).
+ *
+ * <b>Example usage:</b>
+ *
+ * {{{
+ * def possiblyFailing(): Future[Something] = ???
+ * val shouldRetry: (Something, Throwable) => throwable ne null
+ * val withRetry: Future[Something] = retry(attempt = possiblyFailing,
shouldRetry, attempts = 10)
+ * }}}
+ *
+ * @param attempt the function to be attempted
+ * @param shouldRetry the predicate to determine if the attempt should be
retried
+ * @param attempts the maximum number of attempts
+ * @param ec the execution context
+ * @return the result future which maybe retried
+ *
+ * @since 1.1.0
+ */
+ def retry[T](attempt: () => Future[T],
+ shouldRetry: (T, Throwable) => Boolean,
+ attempts: Int)(implicit ec: ExecutionContext): Future[T] = {
+ RetrySupport.retry(attempt, shouldRetry, attempts,
ConstantFun.scalaAnyToNone, attempted = 0)(ec, null)
+ }
+
/**
* Given a function from Unit to Future, returns an internally retrying
Future.
* The first attempt will be made immediately, each subsequent attempt will
be made with a backoff time,
@@ -53,7 +88,7 @@ trait RetrySupport {
*
* If attempts are exhausted the returned future is simply the result of
invoking attempt.
* Note that the attempt function will be invoked on the given execution
context for subsequent
- * tries and therefore must be thread safe (not touch unsafe mutable state).
+ * tries and therefore must be thread safe (i.e. not touch unsafe mutable
state).
*
* <b>Example usage:</b>
*
@@ -81,7 +116,64 @@ trait RetrySupport {
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double)(implicit ec: ExecutionContext, scheduler:
Scheduler): Future[T] = {
- require(attempt != null, "Parameter attempt should not be null.")
+ retry(
+ attempt,
+ RetrySupport.retryOnException,
+ attempts,
+ minBackoff,
+ maxBackoff,
+ randomFactor)
+ }
+
+ /**
+ * Given a function from Unit to Future, returns an internally retrying
Future.
+ *
+ * When the future is completed, the `shouldRetry` predicate is always been
invoked with the result (or `null` if none)
+ * and the exception (or `null` if none). If the `shouldRetry` predicate
returns true, then a new attempt is made,
+ * each subsequent attempt will be made after the 'delay' return by
`delayFunction` (the input next attempt count start from 1).
+ * Returns [[scala.None]] for no delay.
+ *
+ * If attempts are exhausted the returned future is simply the result of
invoking attempt.
+ * Note that the attempt function will be invoked on the given execution
context for subsequent
+ * tries and therefore must be thread safe (i.e. not touch unsafe mutable
state).
+ *
+ * <b>Example usage:</b>
+ *
+ * {{{
+ * protected val sendAndReceive: HttpRequest => Future[HttpResponse]
+ * protected val shouldRetry: (HttpResponse, Throwable) => throwable ne null
+ * private val sendReceiveRetry: HttpRequest => Future[HttpResponse] = (req:
HttpRequest) => retry[HttpResponse](
+ * attempt = () => sendAndReceive(req),
+ * shouldRetry,
+ * attempts = 10,
+ * minBackoff = 1.seconds,
+ * maxBackoff = 2.seconds,
+ * randomFactor = 0.5
+ * )
+ * }}}
+ *
+ * @param attempt the function to be attempted
+ * @param shouldRetry the predicate to determine if the attempt should be
retried
+ * @param attempts the maximum number of attempts
+ * @param minBackoff minimum (initial) duration until the child actor will
+ * started again, if it is terminated
+ * @param maxBackoff the exponential back-off is capped to this duration
+ * @param randomFactor after calculation of the exponential back-off an
additional
+ * random delay based on this factor is added, e.g.
`0.2` adds up to `20%` delay.
+ * In order to skip this additional delay pass in `0`.
+ * @param ec the execution context
+ * @param scheduler the scheduler for scheduling a delay
+ * @return the result future which maybe retried
+ *
+ * @since 1.1.0
+ */
+ def retry[T](
+ attempt: () => Future[T],
+ shouldRetry: (T, Throwable) => Boolean,
+ attempts: Int,
+ minBackoff: FiniteDuration,
+ maxBackoff: FiniteDuration,
+ randomFactor: Double)(implicit ec: ExecutionContext, scheduler:
Scheduler): Future[T] = {
require(minBackoff != null, "Parameter minBackoff should not be null.")
require(maxBackoff != null, "Parameter maxBackoff should not be null.")
require(minBackoff > Duration.Zero, "Parameter minBackoff must be > 0")
@@ -89,6 +181,7 @@ trait RetrySupport {
require(0.0 <= randomFactor && randomFactor <= 1.0, "randomFactor must be
between 0.0 and 1.0")
retry(
attempt,
+ shouldRetry,
attempts,
attempted => Some(BackoffSupervisor.calculateDelay(attempted,
minBackoff, maxBackoff, randomFactor)))
}
@@ -100,7 +193,7 @@ trait RetrySupport {
*
* If attempts are exhausted the returned future is simply the result of
invoking attempt.
* Note that the attempt function will be invoked on the given execution
context for subsequent
- * tries and therefore must be thread safe (not touch unsafe mutable state).
+ * tries and therefore must be thread safe (i.e. not touch unsafe mutable
state).
*
* <b>Example usage:</b>
*
@@ -119,18 +212,63 @@ trait RetrySupport {
retry(attempt, attempts, _ => Some(delay))
}
+ /**
+ * Given a function from Unit to Future, returns an internally retrying
Future.
+ *
+ * When the future is completed, the `shouldRetry` predicate is always been
invoked with the result (or `null` if none)
+ * and the exception (or `null` if none). If the `shouldRetry` predicate
returns true, then a new attempt is made,
+ * each subsequent attempt will be made after the 'delay' return by
`delayFunction` (the input next attempt count start from 1).
+ * Returns [[scala.None]] for no delay.
+ *
+ * If attempts are exhausted the returned future is simply the result of
invoking attempt.
+ * Note that the attempt function will be invoked on the given execution
context for subsequent
+ * tries and therefore must be thread safe (i.e. not touch unsafe mutable
state).
+ *
+ * <b>Example usage:</b>
+ *
+ * {{{
+ * protected val sendAndReceive: HttpRequest => Future[HttpResponse]
+ * protected val shouldRetry: (HttpResponse, Throwable) => throwable ne null
+ * private val sendReceiveRetry: HttpRequest => Future[HttpResponse] = (req:
HttpRequest) => retry[HttpResponse](
+ * attempt = () => sendAndReceive(req),
+ * shouldRetry,
+ * attempts = 10,
+ * delay = 2.seconds
+ * )
+ * }}}
+ *
+ * @param attempt the function to be attempted
+ * @param shouldRetry the predicate to determine if the attempt should be
retried
+ * @param attempts the maximum number of attempts
+ * @param delay the delay duration
+ * @param ec the execution context
+ * @param scheduler the scheduler for scheduling a delay
+ * @return the result future which maybe retried
+ *
+ * @since 1.1.0
+ */
+ def retry[T](attempt: () => Future[T],
+ shouldRetry: (T, Throwable) => Boolean,
+ attempts: Int,
+ delay: FiniteDuration)(
+ implicit ec: ExecutionContext,
+ scheduler: Scheduler): Future[T] = {
+ retry(attempt, shouldRetry, attempts, _ => Some(delay))
+ }
+
/**
* Given a function from Unit to Future, returns an internally retrying
Future.
* The first attempt will be made immediately, each subsequent attempt will
be made after
- * the 'delay' return by `delayFunction`(the input next attempt count start
from 1).
+ * the 'delay' return by `delayFunction` (the input next attempt count start
from 1).
* Returns [[scala.None]] for no delay.
+ *
* A scheduler (eg context.system.scheduler) must be provided to delay each
retry.
* You could provide a function to generate the next delay duration after
first attempt,
* this function should never return `null`, otherwise an
[[java.lang.IllegalArgumentException]] will be through.
*
* If attempts are exhausted the returned future is simply the result of
invoking attempt.
* Note that the attempt function will be invoked on the given execution
context for subsequent
- * tries and therefore must be thread safe (not touch unsafe mutable state).
+ * tries and therefore must be thread safe (i.e. not touch unsafe mutable
state).
*
* <b>Example usage:</b>
*
@@ -148,22 +286,70 @@ trait RetrySupport {
implicit
ec: ExecutionContext,
scheduler: Scheduler): Future[T] = {
- RetrySupport.retry(attempt, attempts, delayFunction, attempted = 0)
+ RetrySupport.retry(attempt, RetrySupport.retryOnException, attempts,
delayFunction, attempted = 0)
+ }
+
+ /**
+ * Given a function from Unit to Future, returns an internally retrying
Future.
+ *
+ * When the future is completed, the `shouldRetry` predicate is always been
invoked with the result (or `null` if none)
+ * and the exception (or `null` if none). If the `shouldRetry` predicate
returns true, then a new attempt is made,
+ * each subsequent attempt will be made after the 'delay' return by
`delayFunction` (the input next attempt count start from 1).
+ * Returns [[scala.None]] for no delay.
+ *
+ * A scheduler (eg context.system.scheduler) must be provided to delay each
retry.
+ * You could provide a function to generate the next delay duration after
first attempt,
+ * this function should never return `null`, otherwise an
[[java.lang.IllegalArgumentException]] will be through.
+ *
+ * If attempts are exhausted the returned future is simply the result of
invoking attempt.
+ * Note that the attempt function will be invoked on the given execution
context for subsequent
+ * tries and therefore must be thread safe (i.e. not touch unsafe mutable
state).
+ *
+ * <b>Example usage:</b>
+ *
+ * //retry with back off
+ * {{{
+ * protected val sendAndReceive: HttpRequest => Future[HttpResponse]
+ * protected val shouldRetry: (HttpResponse, Throwable) => throwable ne null
+ * private val sendReceiveRetry: HttpRequest => Future[HttpResponse] = (req:
HttpRequest) => retry[HttpResponse](
+ * attempt = () => sendAndReceive(req),
+ * shouldRetry,
+ * attempts = 10,
+ * delayFunction = attempted => Option(2.seconds * attempted)
+ * )
+ * }}}
+ *
+ * @param attempt the function to be attempted
+ * @param shouldRetry the predicate to determine if the attempt should be
retried
+ * @param attempts the maximum number of attempts
+ * @param delayFunction the function to generate the next delay duration,
`None` for no delay
+ * @param ec the execution context
+ * @param scheduler the scheduler for scheduling a delay
+ * @return the result future which maybe retried
+ *
+ * @since 1.1.0
+ */
+ def retry[T](attempt: () => Future[T],
+ shouldRetry: (T, Throwable) => Boolean,
+ attempts: Int,
+ delayFunction: Int => Option[FiniteDuration])(implicit ec:
ExecutionContext, scheduler: Scheduler): Future[T] = {
+ RetrySupport.retry(attempt, shouldRetry, attempts, delayFunction,
attempted = 0)
}
}
object RetrySupport extends RetrySupport {
+ private val retryOnException: (Any, Throwable) => Boolean = (_: Any, e:
Throwable) => e != null
private def retry[T](attempt: () => Future[T], maxAttempts: Int, attempted:
Int)(
implicit ec: ExecutionContext): Future[T] =
- retry(attempt, maxAttempts, ConstantFun.scalaAnyToNone, attempted)(ec,
null)
+ retry(attempt, retryOnException, maxAttempts, ConstantFun.scalaAnyToNone,
attempted)(ec, null)
private def retry[T](
attempt: () => Future[T],
+ shouldRetry: (T, Throwable) => Boolean,
maxAttempts: Int,
delayFunction: Int => Option[FiniteDuration],
attempted: Int)(implicit ec: ExecutionContext, scheduler: Scheduler):
Future[T] = {
-
def tryAttempt(): Future[T] = {
try {
attempt()
@@ -172,30 +358,34 @@ object RetrySupport extends RetrySupport {
}
}
- require(maxAttempts >= 0, "Parameter maxAttempts must >= 0.")
+ def doRetry(nextAttempt: Int): Future[T] = delayFunction(nextAttempt)
match {
+ case Some(delay) =>
+ if (delay.length < 1)
+ retry(attempt, shouldRetry, maxAttempts, delayFunction, nextAttempt)
+ else
+ after(delay, scheduler) {
+ retry(attempt, shouldRetry, maxAttempts, delayFunction,
nextAttempt)
+ }
+ case None =>
+ retry(attempt, shouldRetry, maxAttempts, delayFunction, nextAttempt)
+ case null =>
+ Future.failed(new IllegalArgumentException("The delayFunction of retry
should not return null."))
+ }
+
require(attempt != null, "Parameter attempt should not be null.")
+ require(maxAttempts >= 0, "Parameter maxAttempts must >= 0.")
+ require(delayFunction != null, "Parameter delayFunction should not be
null.")
+ require(attempted >= 0, "Parameter attempted must >= 0.")
+
if (maxAttempts - attempted > 0) {
val result = tryAttempt()
if (result eq null)
result
else {
- val nextAttempt = attempted + 1
- result.recoverWith {
- case NonFatal(_) =>
- delayFunction(nextAttempt) match {
- case Some(delay) =>
- if (delay.length < 1)
- retry(attempt, maxAttempts, delayFunction, nextAttempt)
- else
- after(delay, scheduler) {
- retry(attempt, maxAttempts, delayFunction, nextAttempt)
- }
- case None =>
- retry(attempt, maxAttempts, delayFunction, nextAttempt)
- case null =>
- Future.failed(new IllegalArgumentException("The delayFunction
of retry should not return null."))
- }
-
+ result.transformWith {
+ case Success(value) if shouldRetry(value, null)
=> doRetry(attempted + 1)
+ case Failure(e) if NonFatal(e) && shouldRetry(null.asInstanceOf[T],
e) => doRetry(attempted + 1)
+ case _
=> result
}
}
diff --git a/docs/src/test/java/jdocs/future/FutureDocTest.java
b/docs/src/test/java/jdocs/future/FutureDocTest.java
index 861ebd9fae..5627b1d4e1 100644
--- a/docs/src/test/java/jdocs/future/FutureDocTest.java
+++ b/docs/src/test/java/jdocs/future/FutureDocTest.java
@@ -85,4 +85,17 @@ public class FutureDocTest extends AbstractJavaTest {
retriedFuture.toCompletableFuture().get(2, SECONDS);
}
+
+ @Test
+ public void useRetryWithPredicate() throws Exception {
+ // #retry
+ Callable<CompletionStage<String>> attempt = () ->
CompletableFuture.completedFuture("test");
+
+ CompletionStage<String> retriedFuture =
+ Patterns.retry(
+ attempt, (notUsed, e) -> e != null, 3,
java.time.Duration.ofMillis(200), system);
+ // #retry
+
+ retriedFuture.toCompletableFuture().get(2, SECONDS);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]