This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit edb8b089dd6f7fc9083525873f60858150666708 Author: LanKhuat <khuatdang...@gmail.com> AuthorDate: Mon May 25 15:09:16 2020 +0700 JAMES-3184 Methods name clarification & reordering --- .../tools/indexer/ErrorRecoveryIndexationTask.java | 2 +- .../mailbox/tools/indexer/FullReindexingTask.java | 2 +- .../tools/indexer/MessageIdReIndexingTask.java | 2 +- .../mailbox/tools/indexer/ReIndexerPerformer.java | 87 +++++++++++----------- .../tools/indexer/SingleMailboxReindexingTask.java | 2 +- .../tools/indexer/SingleMessageReindexingTask.java | 2 +- .../mailbox/tools/indexer/UserReindexingTask.java | 2 +- 7 files changed, 49 insertions(+), 50 deletions(-) diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTask.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTask.java index 8517465..df20874 100644 --- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTask.java +++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTask.java @@ -81,7 +81,7 @@ public class ErrorRecoveryIndexationTask implements Task { @Override public Result run() { - return reIndexerPerformer.reIndex(reprocessingContext, previousFailures, runningOptions).block(); + return reIndexerPerformer.reIndexErrors(reprocessingContext, previousFailures, runningOptions).block(); } @Override diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/FullReindexingTask.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/FullReindexingTask.java index e841d12..b99eb5f 100644 --- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/FullReindexingTask.java +++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/FullReindexingTask.java @@ -48,7 +48,7 @@ public class FullReindexingTask implements Task { @Override public Result run() { - return reIndexerPerformer.reIndex(reprocessingContext, runningOptions) + return reIndexerPerformer.reIndexAllMessages(reprocessingContext, runningOptions) .onErrorResume(e -> Mono.just(Result.PARTIAL)) .block(); } diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/MessageIdReIndexingTask.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/MessageIdReIndexingTask.java index 191d0ab..bc1c97a 100644 --- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/MessageIdReIndexingTask.java +++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/MessageIdReIndexingTask.java @@ -78,7 +78,7 @@ public class MessageIdReIndexingTask implements Task { @Override public Result run() { - return reIndexerPerformer.handleMessageIdReindexing(messageId).block(); + return reIndexerPerformer.reIndexMessageId(messageId).block(); } @Override diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java index 030b468..433e975 100644 --- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java +++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java @@ -51,7 +51,6 @@ public class ReIndexerPerformer { private static final Logger LOGGER = LoggerFactory.getLogger(ReIndexerPerformer.class); private static final int SINGLE_MESSAGE = 1; - private static final int MESSAGE_CONCURRENCY = 50; private static final String RE_INDEXING = "re-indexing"; private static final Username RE_INDEXER_PERFORMER_USER = Username.of(RE_INDEXING); private static final int NO_CONCURRENCY = 1; @@ -70,45 +69,7 @@ public class ReIndexerPerformer { this.mailboxSessionMapperFactory = mailboxSessionMapperFactory; } - Mono<Result> reIndex(MailboxId mailboxId, ReprocessingContext reprocessingContext, RunningOptions runningOptions) { - MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER); - - return mailboxSessionMapperFactory.getMailboxMapper(mailboxSession) - .findMailboxByIdReactive(mailboxId) - .flatMap(mailbox -> reIndex(reprocessingContext, mailboxSession, mailbox, runningOptions)); - } - - Mono<Result> reIndex(ReprocessingContext reprocessingContext, ReIndexingExecutionFailures previousReIndexingFailures, RunningOptions runningOptions) { - return Flux.fromIterable(previousReIndexingFailures.failures()) - .flatMap(previousFailure -> reIndex(reprocessingContext, previousFailure), runningOptions.getMessagesPerSecond()) - .reduce(Task::combine) - .switchIfEmpty(Mono.just(Result.COMPLETED)); - } - - private Mono<Result> reIndex(ReprocessingContext reprocessingContext, MailboxSession mailboxSession, Mailbox mailbox, RunningOptions runningOptions) { - LOGGER.info("Attempt to reindex mailbox with mailboxId {}", mailbox.getMailboxId().serialize()); - return messageSearchIndex.deleteAll(mailboxSession, mailbox.getMailboxId()) - .then(mailboxSessionMapperFactory.getMessageMapper(mailboxSession) - .listAllMessageUids(mailbox) - .flatMap(uid -> handleMessageReIndexing(mailboxSession, mailbox, uid, reprocessingContext), runningOptions.getMessagesPerSecond()) - .reduce(Task::combine) - .switchIfEmpty(Mono.just(Result.COMPLETED)) - .doFinally(any -> LOGGER.info("Finish to reindex mailbox with mailboxId {}", mailbox.getMailboxId().serialize()))); - } - - private Mono<Result> reIndex(ReprocessingContext reprocessingContext, ReIndexingExecutionFailures.ReIndexingFailure previousReIndexingFailure) { - MailboxId mailboxId = previousReIndexingFailure.getMailboxId(); - MessageUid uid = previousReIndexingFailure.getUid(); - - return handleMessageReIndexing(mailboxId, uid, reprocessingContext) - .onErrorResume(e -> { - LOGGER.warn("ReIndexing failed for {} {}", mailboxId, uid, e); - reprocessingContext.recordFailureDetailsForMessage(mailboxId, uid); - return Mono.just(Result.PARTIAL); - }); - } - - Mono<Result> reIndex(ReprocessingContext reprocessingContext, RunningOptions runningOptions) { + Mono<Result> reIndexAllMessages(ReprocessingContext reprocessingContext, RunningOptions runningOptions) { MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER); LOGGER.info("Starting a full reindex"); return mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).list() @@ -118,7 +79,15 @@ public class ReIndexerPerformer { .doFinally(any -> LOGGER.info("Full reindex finished")); } - Mono<Result> reIndex(Username username, ReprocessingContext reprocessingContext, RunningOptions runningOptions) { + Mono<Result> reIndexSingleMailbox(MailboxId mailboxId, ReprocessingContext reprocessingContext, RunningOptions runningOptions) { + MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER); + + return mailboxSessionMapperFactory.getMailboxMapper(mailboxSession) + .findMailboxByIdReactive(mailboxId) + .flatMap(mailbox -> reIndex(reprocessingContext, mailboxSession, mailbox, runningOptions)); + } + + Mono<Result> reIndexUserMailboxes(Username username, ReprocessingContext reprocessingContext, RunningOptions runningOptions) { MailboxSession mailboxSession = mailboxManager.createSystemSession(username); LOGGER.info("Starting a reindex for user {}", username.asString()); @@ -126,13 +95,13 @@ public class ReIndexerPerformer { return mailboxManager.searchReactive(mailboxQuery, mailboxSession) .map(MailboxMetaData::getId) - .flatMap(id -> reIndex(id, reprocessingContext, runningOptions), NO_CONCURRENCY, NO_PREFETCH) + .flatMap(id -> reIndexSingleMailbox(id, reprocessingContext, runningOptions), NO_CONCURRENCY, NO_PREFETCH) .reduce(Task::combine) .switchIfEmpty(Mono.just(Result.COMPLETED)) .doFinally(any -> LOGGER.info("User {} reindex finished", username.asString())); } - Mono<Result> handleMessageReIndexing(MailboxId mailboxId, MessageUid uid, ReprocessingContext reprocessingContext) { + Mono<Result> reIndexSingleMessage(MailboxId mailboxId, MessageUid uid, ReprocessingContext reprocessingContext) { MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER); return mailboxSessionMapperFactory.getMailboxMapper(mailboxSession) @@ -140,7 +109,7 @@ public class ReIndexerPerformer { .flatMap(mailbox -> handleMessageReIndexing(mailboxSession, mailbox, uid, reprocessingContext)); } - Mono<Result> handleMessageIdReindexing(MessageId messageId) { + Mono<Result> reIndexMessageId(MessageId messageId) { MailboxSession session = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER); return mailboxSessionMapperFactory.getMessageIdMapper(session) @@ -154,6 +123,36 @@ public class ReIndexerPerformer { }); } + Mono<Result> reIndexErrors(ReprocessingContext reprocessingContext, ReIndexingExecutionFailures previousReIndexingFailures, RunningOptions runningOptions) { + return Flux.fromIterable(previousReIndexingFailures.failures()) + .flatMap(previousFailure -> reIndex(reprocessingContext, previousFailure), runningOptions.getMessagesPerSecond()) + .reduce(Task::combine) + .switchIfEmpty(Mono.just(Result.COMPLETED)); + } + + private Mono<Result> reIndex(ReprocessingContext reprocessingContext, MailboxSession mailboxSession, Mailbox mailbox, RunningOptions runningOptions) { + LOGGER.info("Attempt to reindex mailbox with mailboxId {}", mailbox.getMailboxId().serialize()); + return messageSearchIndex.deleteAll(mailboxSession, mailbox.getMailboxId()) + .then(mailboxSessionMapperFactory.getMessageMapper(mailboxSession) + .listAllMessageUids(mailbox) + .flatMap(uid -> handleMessageReIndexing(mailboxSession, mailbox, uid, reprocessingContext), runningOptions.getMessagesPerSecond()) + .reduce(Task::combine) + .switchIfEmpty(Mono.just(Result.COMPLETED)) + .doFinally(any -> LOGGER.info("Finish to reindex mailbox with mailboxId {}", mailbox.getMailboxId().serialize()))); + } + + private Mono<Result> reIndex(ReprocessingContext reprocessingContext, ReIndexingExecutionFailures.ReIndexingFailure previousReIndexingFailure) { + MailboxId mailboxId = previousReIndexingFailure.getMailboxId(); + MessageUid uid = previousReIndexingFailure.getUid(); + + return reIndexSingleMessage(mailboxId, uid, reprocessingContext) + .onErrorResume(e -> { + LOGGER.warn("ReIndexing failed for {} {}", mailboxId, uid, e); + reprocessingContext.recordFailureDetailsForMessage(mailboxId, uid); + return Mono.just(Result.PARTIAL); + }); + } + private Mono<Result> reIndex(MailboxMessage mailboxMessage, MailboxSession session) { return mailboxSessionMapperFactory.getMailboxMapper(session) .findMailboxByIdReactive(mailboxMessage.getMailboxId()) diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMailboxReindexingTask.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMailboxReindexingTask.java index 29dd231..bc474a3 100644 --- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMailboxReindexingTask.java +++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMailboxReindexingTask.java @@ -87,7 +87,7 @@ public class SingleMailboxReindexingTask implements Task { @Override public Result run() { try { - return reIndexerPerformer.reIndex(mailboxId, reprocessingContext, runningOptions) + return reIndexerPerformer.reIndexSingleMailbox(mailboxId, reprocessingContext, runningOptions) .block(); } catch (Exception e) { return Result.PARTIAL; diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMessageReindexingTask.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMessageReindexingTask.java index 08c014b..a5a787d 100644 --- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMessageReindexingTask.java +++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMessageReindexingTask.java @@ -96,7 +96,7 @@ public class SingleMessageReindexingTask implements Task { @Override public Result run() { - return reIndexerPerformer.handleMessageReIndexing(mailboxId, uid, new ReprocessingContext()) + return reIndexerPerformer.reIndexSingleMessage(mailboxId, uid, new ReprocessingContext()) .onErrorResume(e -> { LOGGER.warn("Error encountered while reindexing {} : {}", mailboxId, uid, e); return Mono.just(Result.PARTIAL); diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/UserReindexingTask.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/UserReindexingTask.java index eb06e88..19cde9b 100644 --- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/UserReindexingTask.java +++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/UserReindexingTask.java @@ -87,7 +87,7 @@ public class UserReindexingTask implements Task { @Override public Result run() { - return reIndexerPerformer.reIndex(username, reprocessingContext, runningOptions) + return reIndexerPerformer.reIndexUserMailboxes(username, reprocessingContext, runningOptions) .onErrorResume(e -> Mono.just(Result.PARTIAL)) .block(); } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org