Repository: james-project Updated Branches: refs/heads/master fbb8b42b8 -> f3140967a
JAMES-1945 Async retrier for Cassandra Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/9200405d Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/9200405d Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/9200405d Branch: refs/heads/master Commit: 9200405d39805a0494d4e366545817f958af5cf2 Parents: fbb8b42 Author: benwa <[email protected]> Authored: Wed Feb 22 14:26:32 2017 +0700 Committer: benwa <[email protected]> Committed: Thu Feb 23 10:37:37 2017 +0700 ---------------------------------------------------------------------- .../utils/FunctionRunnerWithRetry.java | 25 +++++- .../utils/FunctionRunnerWithRetryTest.java | 89 ++++++++++++++++++++ 2 files changed, 112 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/9200405d/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/FunctionRunnerWithRetry.java ---------------------------------------------------------------------- diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/FunctionRunnerWithRetry.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/FunctionRunnerWithRetry.java index 314b81d..86e1aba 100644 --- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/FunctionRunnerWithRetry.java +++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/FunctionRunnerWithRetry.java @@ -19,14 +19,18 @@ package org.apache.james.backends.cassandra.utils; -import com.google.common.base.Preconditions; - import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.function.BooleanSupplier; +import java.util.function.Supplier; import java.util.stream.IntStream; +import com.google.common.base.Preconditions; + public class FunctionRunnerWithRetry { + public static class RelayingException extends RuntimeException {} + @FunctionalInterface public interface OptionalSupplier<T> { Optional<T> getAsOptional(); @@ -54,4 +58,21 @@ public class FunctionRunnerWithRetry { .orElseThrow(() -> new LightweightTransactionException(maxRetry)) .get(); } + + public <T> CompletableFuture<Optional<T>> executeAsyncAndRetrieveObject(Supplier<CompletableFuture<Optional<T>>> futureSupplier) { + return executeAsyncAndRetrieveObject(futureSupplier, 0); + } + + public <T> CompletableFuture<Optional<T>> executeAsyncAndRetrieveObject(Supplier<CompletableFuture<Optional<T>>> futureSupplier, int tries) { + if (tries >= maxRetry) { + return CompletableFuture.completedFuture(Optional.empty()); + } + return futureSupplier.get() + .thenCompose(optional -> { + if (optional.isPresent()) { + return CompletableFuture.completedFuture(optional); + } + return executeAsyncAndRetrieveObject(futureSupplier, tries + 1); + }); + } } http://git-wip-us.apache.org/repos/asf/james-project/blob/9200405d/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/utils/FunctionRunnerWithRetryTest.java ---------------------------------------------------------------------- diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/utils/FunctionRunnerWithRetryTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/utils/FunctionRunnerWithRetryTest.java index 2f85653..a2ae9a4 100644 --- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/utils/FunctionRunnerWithRetryTest.java +++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/utils/FunctionRunnerWithRetryTest.java @@ -21,6 +21,10 @@ package org.apache.james.backends.cassandra.utils; import static org.assertj.core.api.Assertions.assertThat; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.commons.lang.mutable.MutableInt; import org.junit.Test; @@ -80,5 +84,90 @@ public class FunctionRunnerWithRetryTest { ); assertThat(value.getValue()).isEqualTo(MAX_RETRY); } + + @Test + public void asyncFunctionRunnerShouldWorkIfSucceedFirstTry() throws Exception { + int value = 18; + + Optional<Integer> result = new FunctionRunnerWithRetry(MAX_RETRY) + .executeAsyncAndRetrieveObject( + () -> CompletableFuture.completedFuture(Optional.of(value))) + .join(); + + assertThat(result).contains(value); + } + + @Test + public void asyncFunctionRunnerShouldTryOnlyOnceIfSuccess() throws Exception { + int value = 18; + AtomicInteger times = new AtomicInteger(0); + + new FunctionRunnerWithRetry(MAX_RETRY) + .executeAsyncAndRetrieveObject( + () -> { + times.incrementAndGet(); + return CompletableFuture.completedFuture(Optional.of(value)); + }) + .join(); + + assertThat(times.get()).isEqualTo(1); + } + + @Test + public void asyncFunctionRunnerShouldRetrieveValueOnRetry() throws Exception { + int value = 18; + AtomicInteger times = new AtomicInteger(0); + + Optional<Integer> result = new FunctionRunnerWithRetry(MAX_RETRY) + .executeAsyncAndRetrieveObject( + () -> { + int attemptCount = times.incrementAndGet(); + if (attemptCount == MAX_RETRY) { + return CompletableFuture.completedFuture(Optional.of(value)); + } else { + return CompletableFuture.completedFuture(Optional.empty()); + } + }) + .join(); + + assertThat(result).contains(value); + } + + @Test + public void asyncFunctionRunnerShouldMakeMaxRetryAttempts() throws Exception { + int value = 18; + AtomicInteger times = new AtomicInteger(0); + + new FunctionRunnerWithRetry(MAX_RETRY) + .executeAsyncAndRetrieveObject( + () -> { + int attemptCount = times.incrementAndGet(); + if (attemptCount == MAX_RETRY) { + return CompletableFuture.completedFuture(Optional.of(value)); + } else { + return CompletableFuture.completedFuture(Optional.empty()); + } + }) + .join(); + + assertThat(times.get()).isEqualTo(MAX_RETRY); + } + + + @Test + public void asyncFunctionRunnerShouldReturnEmptyIfAllFailed() throws Exception { + AtomicInteger times = new AtomicInteger(0); + + Optional<Integer> result = new FunctionRunnerWithRetry(MAX_RETRY) + .executeAsyncAndRetrieveObject( + () -> { + times.incrementAndGet(); + return CompletableFuture.completedFuture(Optional.<Integer>empty()); + }) + .join(); + + assertThat(result).isEmpty(); + assertThat(times.get()).isEqualTo(MAX_RETRY); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
