This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit aa4623c4142e6e42207723e6ba01cd6e5f79aa3a Author: LanKhuat <khuatdang...@gmail.com> AuthorDate: Thu May 28 10:43:31 2020 +0700 JAMES-3184 Use ReactorUtils Throttler --- .../apache/mailbox/tools/indexer/ReIndexerPerformer.java | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java index 390a705..e9b08d3 100644 --- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java +++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java @@ -44,6 +44,7 @@ import org.apache.james.mailbox.store.mail.model.MailboxMessage; import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex; import org.apache.james.task.Task; import org.apache.james.task.Task.Result; +import org.apache.james.util.ReactorUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,7 +52,6 @@ import com.google.common.collect.ImmutableList; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.util.function.Tuple2; public class ReIndexerPerformer { private static class ReIndexingEntry { @@ -83,7 +83,6 @@ public class ReIndexerPerformer { private static final int SINGLE_MESSAGE = 1; private static final String RE_INDEXING = "re-indexing"; private static final Username RE_INDEXER_PERFORMER_USER = Username.of(RE_INDEXING); - private static final Duration DELAY = Duration.ofSeconds(0); private final MailboxManager mailboxManager; private final ListeningMessageSearchIndex messageSearchIndex; @@ -206,8 +205,9 @@ public class ReIndexerPerformer { } private Mono<Task.Result> reIndexMessages(Flux<ReIndexingEntry> entriesToIndex, RunningOptions runningOptions, ReprocessingContext reprocessingContext) { - return throttle(entriesToIndex, Duration.ofSeconds(1), runningOptions.getMessagesPerSecond()) - .flatMap(entry -> reIndexMessage(entry.getMailboxSession(), entry.getMailbox(), reprocessingContext, entry.getMessage()), runningOptions.getMessagesPerSecond()) + return ReactorUtils.Throttler.<ReIndexingEntry, Task.Result>forOperation(entry -> reIndexMessage(entry.getMailboxSession(), entry.getMailbox(), reprocessingContext, entry.getMessage())) + .window(runningOptions.getMessagesPerSecond(), Duration.ofSeconds(1)) + .throttle(entriesToIndex) .reduce(Task::combine) .switchIfEmpty(Mono.just(Result.COMPLETED)); } @@ -222,10 +222,4 @@ public class ReIndexerPerformer { return Mono.just(Result.PARTIAL); }); } - - private <V> Flux<V> throttle(Flux<V> flux, Duration windowDuration, int windowMaxSize) { - return flux.windowTimeout(windowMaxSize, windowDuration) - .zipWith(Flux.interval(DELAY, windowDuration)) - .flatMap(Tuple2::getT1); - } } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org