This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit f36ea102f8d67a538bdfc8d9afb4698b1ed95510 Author: LanKhuat <[email protected]> AuthorDate: Thu May 28 11:12:38 2020 +0700 JAMES-3184 Use ReactorUtils Throttler --- .../mail/task/SolveMessageInconsistenciesService.java | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java index e25d6f3..89a90ea 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java @@ -38,6 +38,7 @@ import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO; import org.apache.james.mailbox.model.ComposedMessageId; import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData; import org.apache.james.task.Task; +import org.apache.james.util.ReactorUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +48,6 @@ import com.google.common.collect.ImmutableList; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.util.function.Tuple2; public class SolveMessageInconsistenciesService { @@ -424,18 +424,13 @@ public class SolveMessageInconsistenciesService { } private Flux<Task.Result> fixInconsistenciesInImapUid(Context context, RunningOptions runningOptions) { - return throttle(messageIdToImapUidDAO.retrieveAllMessages(), runningOptions) + return ReactorUtils.Throttler.forOperation(this::detectInconsistencyInImapUid) + .window(runningOptions.getMessagesPerSecond(), PERIOD) + .throttle(messageIdToImapUidDAO.retrieveAllMessages()) .doOnNext(any -> context.incrementProcessedImapUidEntries()) - .flatMap(this::detectInconsistencyInImapUid) .flatMap(inconsistency -> inconsistency.fix(context, messageIdToImapUidDAO, messageIdDAO)); } - private Flux<ComposedMessageIdWithMetaData> throttle(Flux<ComposedMessageIdWithMetaData> messages, RunningOptions runningOptions) { - return messages.windowTimeout(runningOptions.getMessagesPerSecond(), Duration.ofSeconds(1)) - .zipWith(Flux.interval(DELAY, PERIOD)) - .flatMap(Tuple2::getT1); - } - private Mono<Inconsistency> detectInconsistencyInImapUid(ComposedMessageIdWithMetaData message) { return compareWithMessageIdRecord(message) .onErrorResume(error -> Mono.just(new FailedToRetrieveRecord(message))); @@ -474,9 +469,10 @@ public class SolveMessageInconsistenciesService { } private Flux<Task.Result> fixInconsistenciesInMessageId(Context context, RunningOptions runningOptions) { - return throttle(messageIdDAO.retrieveAllMessages(), runningOptions) + return ReactorUtils.Throttler.forOperation(this::detectInconsistencyInMessageId) + .window(runningOptions.getMessagesPerSecond(), PERIOD) + .throttle(messageIdDAO.retrieveAllMessages()) .doOnNext(any -> context.incrementMessageIdEntries()) - .flatMap(this::detectInconsistencyInMessageId) .flatMap(inconsistency -> inconsistency.fix(context, messageIdToImapUidDAO, messageIdDAO)); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
