This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 11682778c23e4e8976223cf0e29398bfc193dd52 Author: LanKhuat <khuatdang...@gmail.com> AuthorDate: Mon May 25 15:32:01 2020 +0700 JAMES-3184 Throttling for reindexing tasks --- .../mailbox/tools/indexer/ReIndexerPerformer.java | 155 ++++++++++++++------- 1 file changed, 101 insertions(+), 54 deletions(-) diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java index 433e975..390a705 100644 --- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java +++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java @@ -19,6 +19,10 @@ package org.apache.mailbox.tools.indexer; +import static org.apache.james.mailbox.store.mail.AbstractMessageMapper.UNLIMITED; + +import java.time.Duration; + import javax.inject.Inject; import org.apache.james.core.Username; @@ -27,13 +31,14 @@ import org.apache.james.mailbox.MailboxSession; import org.apache.james.mailbox.MessageUid; import org.apache.james.mailbox.indexer.ReIndexer.RunningOptions; import org.apache.james.mailbox.indexer.ReIndexingExecutionFailures; +import org.apache.james.mailbox.indexer.ReIndexingExecutionFailures.ReIndexingFailure; import org.apache.james.mailbox.model.Mailbox; import org.apache.james.mailbox.model.MailboxId; -import org.apache.james.mailbox.model.MailboxMetaData; import org.apache.james.mailbox.model.MessageId; import org.apache.james.mailbox.model.MessageRange; import org.apache.james.mailbox.model.search.MailboxQuery; import org.apache.james.mailbox.store.MailboxSessionMapperFactory; +import org.apache.james.mailbox.store.mail.MailboxMapper; import org.apache.james.mailbox.store.mail.MessageMapper; import org.apache.james.mailbox.store.mail.model.MailboxMessage; import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex; @@ -46,15 +51,39 @@ import com.google.common.collect.ImmutableList; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; public class ReIndexerPerformer { + private static class ReIndexingEntry { + private final Mailbox mailbox; + private final MailboxSession mailboxSession; + private final MailboxMessage message; + + ReIndexingEntry(Mailbox mailbox, MailboxSession mailboxSession, MailboxMessage message) { + this.mailbox = mailbox; + this.mailboxSession = mailboxSession; + this.message = message; + } + + public Mailbox getMailbox() { + return mailbox; + } + + public MailboxMessage getMessage() { + return message; + } + + public MailboxSession getMailboxSession() { + return mailboxSession; + } + } + private static final Logger LOGGER = LoggerFactory.getLogger(ReIndexerPerformer.class); private static final int SINGLE_MESSAGE = 1; private static final String RE_INDEXING = "re-indexing"; private static final Username RE_INDEXER_PERFORMER_USER = Username.of(RE_INDEXING); - private static final int NO_CONCURRENCY = 1; - private static final int NO_PREFETCH = 1; + private static final Duration DELAY = Duration.ofSeconds(0); private final MailboxManager mailboxManager; private final ListeningMessageSearchIndex messageSearchIndex; @@ -72,33 +101,41 @@ public class ReIndexerPerformer { Mono<Result> reIndexAllMessages(ReprocessingContext reprocessingContext, RunningOptions runningOptions) { MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER); LOGGER.info("Starting a full reindex"); - return mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).list() - .flatMap(mailbox -> reIndex(reprocessingContext, mailboxSession, mailbox, runningOptions), NO_CONCURRENCY, NO_PREFETCH) - .reduce(Task::combine) - .switchIfEmpty(Mono.just(Result.COMPLETED)) + + Flux<ReIndexingEntry> entriesToIndex = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).list() + .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession)); + + return reIndexMessages(entriesToIndex, runningOptions, reprocessingContext) .doFinally(any -> LOGGER.info("Full reindex finished")); } Mono<Result> reIndexSingleMailbox(MailboxId mailboxId, ReprocessingContext reprocessingContext, RunningOptions runningOptions) { MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER); - return mailboxSessionMapperFactory.getMailboxMapper(mailboxSession) + Flux<ReIndexingEntry> entriesToIndex = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession) .findMailboxByIdReactive(mailboxId) - .flatMap(mailbox -> reIndex(reprocessingContext, mailboxSession, mailbox, runningOptions)); + .flatMapMany(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession)); + + return reIndexMessages(entriesToIndex, runningOptions, reprocessingContext); } Mono<Result> reIndexUserMailboxes(Username username, ReprocessingContext reprocessingContext, RunningOptions runningOptions) { MailboxSession mailboxSession = mailboxManager.createSystemSession(username); + MailboxMapper mailboxMapper = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession); LOGGER.info("Starting a reindex for user {}", username.asString()); MailboxQuery mailboxQuery = MailboxQuery.privateMailboxesBuilder(mailboxSession).build(); - return mailboxManager.searchReactive(mailboxQuery, mailboxSession) - .map(MailboxMetaData::getId) - .flatMap(id -> reIndexSingleMailbox(id, reprocessingContext, runningOptions), NO_CONCURRENCY, NO_PREFETCH) - .reduce(Task::combine) - .switchIfEmpty(Mono.just(Result.COMPLETED)) - .doFinally(any -> LOGGER.info("User {} reindex finished", username.asString())); + try { + Flux<ReIndexingEntry> entriesToIndex = mailboxMapper.findMailboxWithPathLike(mailboxQuery.asUserBound()) + .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession)); + + return reIndexMessages(entriesToIndex, runningOptions, reprocessingContext) + .doFinally(any -> LOGGER.info("User {} reindex finished", username.asString())); + } catch (Exception e) { + LOGGER.error("Error fetching mailboxes for user: {}", username.asString()); + return Mono.just(Result.PARTIAL); + } } Mono<Result> reIndexSingleMessage(MailboxId mailboxId, MessageUid uid, ReprocessingContext reprocessingContext) { @@ -106,7 +143,9 @@ public class ReIndexerPerformer { return mailboxSessionMapperFactory.getMailboxMapper(mailboxSession) .findMailboxByIdReactive(mailboxId) - .flatMap(mailbox -> handleMessageReIndexing(mailboxSession, mailbox, uid, reprocessingContext)); + .flatMap(mailbox -> fullyReadMessage(mailboxSession, mailbox, uid) + .flatMap(message -> reIndexMessage(mailboxSession, mailbox, reprocessingContext, message))) + .switchIfEmpty(Mono.just(Result.COMPLETED)); } Mono<Result> reIndexMessageId(MessageId messageId) { @@ -124,33 +163,11 @@ public class ReIndexerPerformer { } Mono<Result> reIndexErrors(ReprocessingContext reprocessingContext, ReIndexingExecutionFailures previousReIndexingFailures, RunningOptions runningOptions) { - return Flux.fromIterable(previousReIndexingFailures.failures()) - .flatMap(previousFailure -> reIndex(reprocessingContext, previousFailure), runningOptions.getMessagesPerSecond()) - .reduce(Task::combine) - .switchIfEmpty(Mono.just(Result.COMPLETED)); - } - - private Mono<Result> reIndex(ReprocessingContext reprocessingContext, MailboxSession mailboxSession, Mailbox mailbox, RunningOptions runningOptions) { - LOGGER.info("Attempt to reindex mailbox with mailboxId {}", mailbox.getMailboxId().serialize()); - return messageSearchIndex.deleteAll(mailboxSession, mailbox.getMailboxId()) - .then(mailboxSessionMapperFactory.getMessageMapper(mailboxSession) - .listAllMessageUids(mailbox) - .flatMap(uid -> handleMessageReIndexing(mailboxSession, mailbox, uid, reprocessingContext), runningOptions.getMessagesPerSecond()) - .reduce(Task::combine) - .switchIfEmpty(Mono.just(Result.COMPLETED)) - .doFinally(any -> LOGGER.info("Finish to reindex mailbox with mailboxId {}", mailbox.getMailboxId().serialize()))); - } - - private Mono<Result> reIndex(ReprocessingContext reprocessingContext, ReIndexingExecutionFailures.ReIndexingFailure previousReIndexingFailure) { - MailboxId mailboxId = previousReIndexingFailure.getMailboxId(); - MessageUid uid = previousReIndexingFailure.getUid(); - - return reIndexSingleMessage(mailboxId, uid, reprocessingContext) - .onErrorResume(e -> { - LOGGER.warn("ReIndexing failed for {} {}", mailboxId, uid, e); - reprocessingContext.recordFailureDetailsForMessage(mailboxId, uid); - return Mono.just(Result.PARTIAL); - }); + return reIndexMessages( + Flux.fromIterable(previousReIndexingFailures.failures()) + .flatMap(this::createReindexingEntryFromFailure), + runningOptions, + reprocessingContext); } private Mono<Result> reIndex(MailboxMessage mailboxMessage, MailboxSession session) { @@ -164,21 +181,51 @@ public class ReIndexerPerformer { }); } - private Mono<Result> handleMessageReIndexing(MailboxSession mailboxSession, Mailbox mailbox, MessageUid uid, ReprocessingContext reprocessingContext) { - return fullyReadMessage(mailboxSession, mailbox, uid) - .flatMap(message -> messageSearchIndex.add(mailboxSession, mailbox, message)) - .thenReturn(Result.COMPLETED) + private Mono<MailboxMessage> fullyReadMessage(MailboxSession mailboxSession, Mailbox mailbox, MessageUid mUid) { + return mailboxSessionMapperFactory.getMessageMapper(mailboxSession) + .findInMailboxReactive(mailbox, MessageRange.one(mUid), MessageMapper.FetchType.Full, SINGLE_MESSAGE) + .next(); + } + + private Mono<ReIndexingEntry> createReindexingEntryFromFailure(ReIndexingFailure failure) { + MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER); + + return mailboxSessionMapperFactory.getMailboxMapper(mailboxSession) + .findMailboxByIdReactive(failure.getMailboxId()) + .flatMap(mailbox -> fullyReadMessage(mailboxSession, mailbox, failure.getUid()) + .map(message -> new ReIndexingEntry(mailbox, mailboxSession, message))); + } + + private Flux<ReIndexingEntry> reIndexingEntriesForMailbox(Mailbox mailbox, MailboxSession mailboxSession) { + MessageMapper messageMapper = mailboxSessionMapperFactory.getMessageMapper(mailboxSession); + + return messageSearchIndex.deleteAll(mailboxSession, mailbox.getMailboxId()) + .thenMany(messageMapper.listAllMessageUids(mailbox)) + .flatMap(uid -> messageMapper.findInMailboxReactive(mailbox, MessageRange.one(uid), MessageMapper.FetchType.Full, UNLIMITED)) + .map(message -> new ReIndexingEntry(mailbox, mailboxSession, message)); + } + + private Mono<Task.Result> reIndexMessages(Flux<ReIndexingEntry> entriesToIndex, RunningOptions runningOptions, ReprocessingContext reprocessingContext) { + return throttle(entriesToIndex, Duration.ofSeconds(1), runningOptions.getMessagesPerSecond()) + .flatMap(entry -> reIndexMessage(entry.getMailboxSession(), entry.getMailbox(), reprocessingContext, entry.getMessage()), runningOptions.getMessagesPerSecond()) + .reduce(Task::combine) + .switchIfEmpty(Mono.just(Result.COMPLETED)); + } + + private Mono<Task.Result> reIndexMessage(MailboxSession mailboxSession, Mailbox mailbox, ReprocessingContext reprocessingContext, MailboxMessage message) { + return Mono.fromCallable(() -> messageSearchIndex.add(mailboxSession, mailbox, message)) .doOnNext(any -> reprocessingContext.recordSuccess()) + .thenReturn(Result.COMPLETED) .onErrorResume(e -> { - LOGGER.warn("ReIndexing failed for {} {}", mailbox.generateAssociatedPath(), uid, e); - reprocessingContext.recordFailureDetailsForMessage(mailbox.getMailboxId(), uid); + LOGGER.warn("ReIndexing failed for {} {}", mailbox.generateAssociatedPath(), message.getUid(), e); + reprocessingContext.recordFailureDetailsForMessage(mailbox.getMailboxId(), message.getUid()); return Mono.just(Result.PARTIAL); }); } - private Mono<MailboxMessage> fullyReadMessage(MailboxSession mailboxSession, Mailbox mailbox, MessageUid mUid) { - return mailboxSessionMapperFactory.getMessageMapper(mailboxSession) - .findInMailboxReactive(mailbox, MessageRange.one(mUid), MessageMapper.FetchType.Full, SINGLE_MESSAGE) - .next(); + private <V> Flux<V> throttle(Flux<V> flux, Duration windowDuration, int windowMaxSize) { + return flux.windowTimeout(windowMaxSize, windowDuration) + .zipWith(Flux.interval(DELAY, windowDuration)) + .flatMap(Tuple2::getT1); } -} +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org