This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit aa4623c4142e6e42207723e6ba01cd6e5f79aa3a
Author: LanKhuat <khuatdang...@gmail.com>
AuthorDate: Thu May 28 10:43:31 2020 +0700

    JAMES-3184 Use ReactorUtils Throttler
---
 .../apache/mailbox/tools/indexer/ReIndexerPerformer.java   | 14 ++++----------
 1 file changed, 4 insertions(+), 10 deletions(-)

diff --git 
a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
 
b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
index 390a705..e9b08d3 100644
--- 
a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
+++ 
b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
@@ -44,6 +44,7 @@ import 
org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex;
 import org.apache.james.task.Task;
 import org.apache.james.task.Task.Result;
+import org.apache.james.util.ReactorUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,7 +52,6 @@ import com.google.common.collect.ImmutableList;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import reactor.util.function.Tuple2;
 
 public class ReIndexerPerformer {
     private static class ReIndexingEntry {
@@ -83,7 +83,6 @@ public class ReIndexerPerformer {
     private static final int SINGLE_MESSAGE = 1;
     private static final String RE_INDEXING = "re-indexing";
     private static final Username RE_INDEXER_PERFORMER_USER = 
Username.of(RE_INDEXING);
-    private static final Duration DELAY = Duration.ofSeconds(0);
 
     private final MailboxManager mailboxManager;
     private final ListeningMessageSearchIndex messageSearchIndex;
@@ -206,8 +205,9 @@ public class ReIndexerPerformer {
     }
 
     private Mono<Task.Result> reIndexMessages(Flux<ReIndexingEntry> 
entriesToIndex, RunningOptions runningOptions, ReprocessingContext 
reprocessingContext) {
-        return throttle(entriesToIndex, Duration.ofSeconds(1), 
runningOptions.getMessagesPerSecond())
-            .flatMap(entry -> reIndexMessage(entry.getMailboxSession(), 
entry.getMailbox(), reprocessingContext, entry.getMessage()), 
runningOptions.getMessagesPerSecond())
+        return ReactorUtils.Throttler.<ReIndexingEntry, 
Task.Result>forOperation(entry -> reIndexMessage(entry.getMailboxSession(), 
entry.getMailbox(), reprocessingContext, entry.getMessage()))
+            .window(runningOptions.getMessagesPerSecond(), 
Duration.ofSeconds(1))
+            .throttle(entriesToIndex)
             .reduce(Task::combine)
             .switchIfEmpty(Mono.just(Result.COMPLETED));
     }
@@ -222,10 +222,4 @@ public class ReIndexerPerformer {
                 return Mono.just(Result.PARTIAL);
             });
     }
-
-    private <V> Flux<V> throttle(Flux<V> flux, Duration windowDuration, int 
windowMaxSize) {
-        return flux.windowTimeout(windowMaxSize, windowDuration)
-            .zipWith(Flux.interval(DELAY, windowDuration))
-            .flatMap(Tuple2::getT1);
-    }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to