This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 0643592c34d2543ebaded223f934da697e1b8f2a Author: LanKhuat <khuatdang...@gmail.com> AuthorDate: Thu May 21 15:01:21 2020 +0700 JAMES-3184 Add RunningOptions --- .../apache/james/mailbox/indexer/ReIndexer.java | 24 +++++++++++---- .../mailbox/tools/indexer/ReIndexerImpl.java | 20 ++++++------- .../mailbox/tools/indexer/ReIndexerPerformer.java | 35 +++++++++++----------- .../mailbox/tools/indexer/ThrowsReIndexer.java | 10 +++---- .../tools/indexer/CassandraReIndexerImplTest.java | 2 +- .../mailbox/tools/indexer/ReIndexerImplTest.java | 13 ++++---- .../james/adapter/mailbox/ReIndexerManagement.java | 5 ++-- .../adapter/mailbox/ReIndexerManagementTest.java | 10 +++---- 8 files changed, 68 insertions(+), 51 deletions(-) diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/indexer/ReIndexer.java b/mailbox/api/src/main/java/org/apache/james/mailbox/indexer/ReIndexer.java index 90cf51f..2a015f7 100644 --- a/mailbox/api/src/main/java/org/apache/james/mailbox/indexer/ReIndexer.java +++ b/mailbox/api/src/main/java/org/apache/james/mailbox/indexer/ReIndexer.java @@ -28,18 +28,32 @@ import org.apache.james.task.Task; public interface ReIndexer { - Task reIndex(Username username) throws MailboxException; + class RunningOptions { + public static final RunningOptions DEFAULT = new RunningOptions(50); - Task reIndex(MailboxPath path) throws MailboxException; + private final int messagesPerSecond; - Task reIndex(MailboxId mailboxId) throws MailboxException; + public RunningOptions(int messagesPerSecond) { + this.messagesPerSecond = messagesPerSecond; + } - Task reIndex() throws MailboxException; + public int getMessagesPerSecond() { + return messagesPerSecond; + } + } + + Task reIndex(Username username, RunningOptions runningOptions) throws MailboxException; + + Task reIndex(MailboxPath path, RunningOptions runningOptions) throws MailboxException; + + Task reIndex(MailboxId mailboxId, RunningOptions runningOptions) throws MailboxException; + + Task reIndex(RunningOptions runningOptions) throws MailboxException; Task reIndex(MailboxPath path, MessageUid uid) throws MailboxException; Task reIndex(MailboxId mailboxId, MessageUid uid) throws MailboxException; - Task reIndex(ReIndexingExecutionFailures previousFailures) throws MailboxException; + Task reIndex(ReIndexingExecutionFailures previousFailures, RunningOptions runningOptions) throws MailboxException; } diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java index bb08369..f09050a 100644 --- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java +++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java @@ -56,29 +56,29 @@ public class ReIndexerImpl implements ReIndexer { } @Override - public Task reIndex(MailboxPath path) throws MailboxException { + public Task reIndex(MailboxPath path, RunningOptions runningOptions) throws MailboxException { MailboxSession mailboxSession = mailboxManager.createSystemSession(path.getUser()); MailboxId mailboxId = mailboxManager.getMailbox(path, mailboxSession).getId(); - return new SingleMailboxReindexingTask(reIndexerPerformer, mailboxId); + return new SingleMailboxReindexingTask(reIndexerPerformer, mailboxId, runningOptions); } @Override - public Task reIndex(MailboxId mailboxId) throws MailboxException { + public Task reIndex(MailboxId mailboxId, RunningOptions runningOptions) throws MailboxException { validateIdExists(mailboxId); - return new SingleMailboxReindexingTask(reIndexerPerformer, mailboxId); + return new SingleMailboxReindexingTask(reIndexerPerformer, mailboxId, runningOptions); } @Override - public Task reIndex() { - return new FullReindexingTask(reIndexerPerformer); + public Task reIndex(RunningOptions runningOptions) { + return new FullReindexingTask(reIndexerPerformer, runningOptions); } @Override - public Task reIndex(Username username) { - return new UserReindexingTask(reIndexerPerformer, username); + public Task reIndex(Username username, RunningOptions runningOptions) { + return new UserReindexingTask(reIndexerPerformer, username, runningOptions); } @Override @@ -98,8 +98,8 @@ public class ReIndexerImpl implements ReIndexer { } @Override - public Task reIndex(ReIndexingExecutionFailures previousFailures) { - return new ErrorRecoveryIndexationTask(reIndexerPerformer, previousFailures); + public Task reIndex(ReIndexingExecutionFailures previousFailures, RunningOptions runningOptions) { + return new ErrorRecoveryIndexationTask(reIndexerPerformer, previousFailures, runningOptions); } private void validateIdExists(MailboxId mailboxId) throws MailboxException { diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java index 8efcb38..030b468 100644 --- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java +++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java @@ -25,6 +25,7 @@ import org.apache.james.core.Username; import org.apache.james.mailbox.MailboxManager; import org.apache.james.mailbox.MailboxSession; import org.apache.james.mailbox.MessageUid; +import org.apache.james.mailbox.indexer.ReIndexer.RunningOptions; import org.apache.james.mailbox.indexer.ReIndexingExecutionFailures; import org.apache.james.mailbox.model.Mailbox; import org.apache.james.mailbox.model.MailboxId; @@ -53,8 +54,8 @@ public class ReIndexerPerformer { private static final int MESSAGE_CONCURRENCY = 50; private static final String RE_INDEXING = "re-indexing"; private static final Username RE_INDEXER_PERFORMER_USER = Username.of(RE_INDEXING); - public static final int NO_CONCURRENCY = 1; - public static final int NO_PREFETCH = 1; + private static final int NO_CONCURRENCY = 1; + private static final int NO_PREFETCH = 1; private final MailboxManager mailboxManager; private final ListeningMessageSearchIndex messageSearchIndex; @@ -69,32 +70,32 @@ public class ReIndexerPerformer { this.mailboxSessionMapperFactory = mailboxSessionMapperFactory; } - Mono<Result> reIndex(MailboxId mailboxId, ReprocessingContext reprocessingContext) { + Mono<Result> reIndex(MailboxId mailboxId, ReprocessingContext reprocessingContext, RunningOptions runningOptions) { MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER); return mailboxSessionMapperFactory.getMailboxMapper(mailboxSession) .findMailboxByIdReactive(mailboxId) - .flatMap(mailbox -> reIndex(reprocessingContext, mailboxSession, mailbox)); + .flatMap(mailbox -> reIndex(reprocessingContext, mailboxSession, mailbox, runningOptions)); } - private Mono<Result> reIndex(ReprocessingContext reprocessingContext, MailboxSession mailboxSession, Mailbox mailbox) { + Mono<Result> reIndex(ReprocessingContext reprocessingContext, ReIndexingExecutionFailures previousReIndexingFailures, RunningOptions runningOptions) { + return Flux.fromIterable(previousReIndexingFailures.failures()) + .flatMap(previousFailure -> reIndex(reprocessingContext, previousFailure), runningOptions.getMessagesPerSecond()) + .reduce(Task::combine) + .switchIfEmpty(Mono.just(Result.COMPLETED)); + } + + private Mono<Result> reIndex(ReprocessingContext reprocessingContext, MailboxSession mailboxSession, Mailbox mailbox, RunningOptions runningOptions) { LOGGER.info("Attempt to reindex mailbox with mailboxId {}", mailbox.getMailboxId().serialize()); return messageSearchIndex.deleteAll(mailboxSession, mailbox.getMailboxId()) .then(mailboxSessionMapperFactory.getMessageMapper(mailboxSession) .listAllMessageUids(mailbox) - .flatMap(uid -> handleMessageReIndexing(mailboxSession, mailbox, uid, reprocessingContext), MESSAGE_CONCURRENCY) + .flatMap(uid -> handleMessageReIndexing(mailboxSession, mailbox, uid, reprocessingContext), runningOptions.getMessagesPerSecond()) .reduce(Task::combine) .switchIfEmpty(Mono.just(Result.COMPLETED)) .doFinally(any -> LOGGER.info("Finish to reindex mailbox with mailboxId {}", mailbox.getMailboxId().serialize()))); } - Mono<Result> reIndex(ReprocessingContext reprocessingContext, ReIndexingExecutionFailures previousReIndexingFailures) { - return Flux.fromIterable(previousReIndexingFailures.failures()) - .flatMap(previousFailure -> reIndex(reprocessingContext, previousFailure), MESSAGE_CONCURRENCY) - .reduce(Task::combine) - .switchIfEmpty(Mono.just(Result.COMPLETED)); - } - private Mono<Result> reIndex(ReprocessingContext reprocessingContext, ReIndexingExecutionFailures.ReIndexingFailure previousReIndexingFailure) { MailboxId mailboxId = previousReIndexingFailure.getMailboxId(); MessageUid uid = previousReIndexingFailure.getUid(); @@ -107,17 +108,17 @@ public class ReIndexerPerformer { }); } - Mono<Result> reIndex(ReprocessingContext reprocessingContext) { + Mono<Result> reIndex(ReprocessingContext reprocessingContext, RunningOptions runningOptions) { MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER); LOGGER.info("Starting a full reindex"); return mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).list() - .flatMap(mailbox -> reIndex(reprocessingContext, mailboxSession, mailbox), NO_CONCURRENCY, NO_PREFETCH) + .flatMap(mailbox -> reIndex(reprocessingContext, mailboxSession, mailbox, runningOptions), NO_CONCURRENCY, NO_PREFETCH) .reduce(Task::combine) .switchIfEmpty(Mono.just(Result.COMPLETED)) .doFinally(any -> LOGGER.info("Full reindex finished")); } - Mono<Result> reIndex(Username username, ReprocessingContext reprocessingContext) { + Mono<Result> reIndex(Username username, ReprocessingContext reprocessingContext, RunningOptions runningOptions) { MailboxSession mailboxSession = mailboxManager.createSystemSession(username); LOGGER.info("Starting a reindex for user {}", username.asString()); @@ -125,7 +126,7 @@ public class ReIndexerPerformer { return mailboxManager.searchReactive(mailboxQuery, mailboxSession) .map(MailboxMetaData::getId) - .flatMap(id -> reIndex(id, reprocessingContext), NO_CONCURRENCY, NO_PREFETCH) + .flatMap(id -> reIndex(id, reprocessingContext, runningOptions), NO_CONCURRENCY, NO_PREFETCH) .reduce(Task::combine) .switchIfEmpty(Mono.just(Result.COMPLETED)) .doFinally(any -> LOGGER.info("User {} reindex finished", username.asString())); diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ThrowsReIndexer.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ThrowsReIndexer.java index 1d17fcf..b0ec105 100644 --- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ThrowsReIndexer.java +++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ThrowsReIndexer.java @@ -30,22 +30,22 @@ import org.apache.james.task.Task; public class ThrowsReIndexer implements ReIndexer { @Override - public Task reIndex(MailboxPath path) throws MailboxException { + public Task reIndex(MailboxPath path, RunningOptions runningOptions) throws MailboxException { throw new MailboxException("Not implemented"); } @Override - public Task reIndex(MailboxId mailboxId) throws MailboxException { + public Task reIndex(MailboxId mailboxId, RunningOptions runningOptions) throws MailboxException { throw new MailboxException("Not implemented"); } @Override - public Task reIndex() throws MailboxException { + public Task reIndex(RunningOptions runningOptions) throws MailboxException { throw new MailboxException("Not implemented"); } @Override - public Task reIndex(Username username) throws MailboxException { + public Task reIndex(Username username, RunningOptions runningOptions) throws MailboxException { throw new MailboxException("Not implemented"); } @@ -60,7 +60,7 @@ public class ThrowsReIndexer implements ReIndexer { } @Override - public Task reIndex(ReIndexingExecutionFailures previousFailures) throws MailboxException { + public Task reIndex(ReIndexingExecutionFailures previousFailures, RunningOptions runningOptions) throws MailboxException { throw new MailboxException("Not implemented"); } } diff --git a/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/CassandraReIndexerImplTest.java b/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/CassandraReIndexerImplTest.java index c147110..55ad6ed 100644 --- a/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/CassandraReIndexerImplTest.java +++ b/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/CassandraReIndexerImplTest.java @@ -97,7 +97,7 @@ public class CassandraReIndexerImplTest { .runSuccessfullyWithin(Duration.ofMinutes(10)); // When We re-index - reIndexer.reIndex(INBOX).run(); + reIndexer.reIndex(INBOX, ReIndexer.RunningOptions.DEFAULT).run(); // The indexer is called for each message verify(messageSearchIndex).deleteAll(any(MailboxSession.class), any(MailboxId.class)); diff --git a/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/ReIndexerImplTest.java b/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/ReIndexerImplTest.java index d5b15e7..237dbb5 100644 --- a/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/ReIndexerImplTest.java +++ b/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/ReIndexerImplTest.java @@ -33,6 +33,7 @@ import org.apache.james.mailbox.MessageManager; import org.apache.james.mailbox.MessageUid; import org.apache.james.mailbox.exception.MailboxNotFoundException; import org.apache.james.mailbox.indexer.ReIndexer; +import org.apache.james.mailbox.indexer.ReIndexer.RunningOptions; import org.apache.james.mailbox.inmemory.InMemoryId; import org.apache.james.mailbox.inmemory.InMemoryMailboxManager; import org.apache.james.mailbox.inmemory.manager.InMemoryIntegrationResources; @@ -78,7 +79,7 @@ public class ReIndexerImplTest { MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"), systemSession).getId(); - reIndexer.reIndex(INBOX).run(); + reIndexer.reIndex(INBOX, RunningOptions.DEFAULT).run(); ArgumentCaptor<MailboxMessage> messageCaptor = ArgumentCaptor.forClass(MailboxMessage.class); ArgumentCaptor<MailboxId> mailboxCaptor1 = ArgumentCaptor.forClass(MailboxId.class); @@ -105,7 +106,7 @@ public class ReIndexerImplTest { MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"), systemSession).getId(); - reIndexer.reIndex().run(); + reIndexer.reIndex(RunningOptions.DEFAULT).run(); ArgumentCaptor<MailboxMessage> messageCaptor = ArgumentCaptor.forClass(MailboxMessage.class); ArgumentCaptor<MailboxId> mailboxCaptor1 = ArgumentCaptor.forClass(MailboxId.class); ArgumentCaptor<Mailbox> mailboxCaptor2 = ArgumentCaptor.forClass(Mailbox.class); @@ -131,7 +132,7 @@ public class ReIndexerImplTest { MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"), systemSession).getId(); - reIndexer.reIndex(USERNAME).run(); + reIndexer.reIndex(USERNAME, RunningOptions.DEFAULT).run(); ArgumentCaptor<MailboxMessage> messageCaptor = ArgumentCaptor.forClass(MailboxMessage.class); ArgumentCaptor<MailboxId> mailboxCaptor1 = ArgumentCaptor.forClass(MailboxId.class); ArgumentCaptor<Mailbox> mailboxCaptor2 = ArgumentCaptor.forClass(Mailbox.class); @@ -223,7 +224,7 @@ public class ReIndexerImplTest { MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"), systemSession).getId(); - reIndexer.reIndex(mailboxId).run(); + reIndexer.reIndex(mailboxId, RunningOptions.DEFAULT).run(); ArgumentCaptor<MailboxMessage> messageCaptor = ArgumentCaptor.forClass(MailboxMessage.class); ArgumentCaptor<MailboxId> mailboxIdCaptor = ArgumentCaptor.forClass(MailboxId.class); ArgumentCaptor<Mailbox> mailboxCaptor = ArgumentCaptor.forClass(Mailbox.class); @@ -245,7 +246,7 @@ public class ReIndexerImplTest { MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME); MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get(); - reIndexer.reIndex(mailboxId).run(); + reIndexer.reIndex(mailboxId, RunningOptions.DEFAULT).run(); ArgumentCaptor<MailboxId> mailboxCaptor = ArgumentCaptor.forClass(MailboxId.class); verify(messageSearchIndex).deleteAll(any(MailboxSession.class), mailboxCaptor.capture()); @@ -258,7 +259,7 @@ public class ReIndexerImplTest { void mailboxIdReIndexShouldFailWhenMailboxNotFound() { MailboxId mailboxId = InMemoryId.of(42); - assertThatThrownBy(() -> reIndexer.reIndex(mailboxId)) + assertThatThrownBy(() -> reIndexer.reIndex(mailboxId, RunningOptions.DEFAULT)) .isInstanceOf(MailboxNotFoundException.class); } } diff --git a/server/container/mailbox-jmx/src/main/java/org/apache/james/adapter/mailbox/ReIndexerManagement.java b/server/container/mailbox-jmx/src/main/java/org/apache/james/adapter/mailbox/ReIndexerManagement.java index 2b621b5..3cf9675 100644 --- a/server/container/mailbox-jmx/src/main/java/org/apache/james/adapter/mailbox/ReIndexerManagement.java +++ b/server/container/mailbox-jmx/src/main/java/org/apache/james/adapter/mailbox/ReIndexerManagement.java @@ -29,6 +29,7 @@ import javax.inject.Named; import org.apache.james.core.Username; import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.indexer.ReIndexer; +import org.apache.james.mailbox.indexer.ReIndexer.RunningOptions; import org.apache.james.mailbox.model.MailboxPath; import org.apache.james.task.TaskId; import org.apache.james.task.TaskManager; @@ -52,7 +53,7 @@ public class ReIndexerManagement implements ReIndexerManagementMBean { .addContext(MDCBuilder.PROTOCOL, "CLI") .addContext(MDCBuilder.ACTION, "reIndex") .build()) { - TaskId taskId = taskManager.submit(reIndexer.reIndex(new MailboxPath(namespace, Username.of(user), name))); + TaskId taskId = taskManager.submit(reIndexer.reIndex(new MailboxPath(namespace, Username.of(user), name), RunningOptions.DEFAULT)); taskManager.await(taskId, Duration.ofMillis(Long.MAX_VALUE)); } catch (IOException | TaskManager.ReachedTimeoutException e) { throw new RuntimeException(e); @@ -66,7 +67,7 @@ public class ReIndexerManagement implements ReIndexerManagementMBean { .addContext(MDCBuilder.PROTOCOL, "CLI") .addContext(MDCBuilder.ACTION, "reIndex") .build()) { - TaskId taskId = taskManager.submit(reIndexer.reIndex()); + TaskId taskId = taskManager.submit(reIndexer.reIndex(RunningOptions.DEFAULT)); taskManager.await(taskId, Duration.ofMillis(Long.MAX_VALUE)); } catch (IOException | TaskManager.ReachedTimeoutException e) { throw new RuntimeException(e); diff --git a/server/container/mailbox-jmx/src/test/java/org/apache/james/adapter/mailbox/ReIndexerManagementTest.java b/server/container/mailbox-jmx/src/test/java/org/apache/james/adapter/mailbox/ReIndexerManagementTest.java index 815d33f..caaf162 100644 --- a/server/container/mailbox-jmx/src/test/java/org/apache/james/adapter/mailbox/ReIndexerManagementTest.java +++ b/server/container/mailbox-jmx/src/test/java/org/apache/james/adapter/mailbox/ReIndexerManagementTest.java @@ -27,8 +27,8 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import org.apache.james.core.Username; -import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.indexer.ReIndexer; +import org.apache.james.mailbox.indexer.ReIndexer.RunningOptions; import org.apache.james.mailbox.model.MailboxPath; import org.apache.james.task.Hostname; import org.apache.james.task.MemoryTaskManager; @@ -56,11 +56,11 @@ public class ReIndexerManagementTest { String namespace = "namespace"; String user = "user"; String name = "name"; - when(reIndexer.reIndex(any(MailboxPath.class))).thenReturn(task); + when(reIndexer.reIndex(any(MailboxPath.class), any(RunningOptions.class))).thenReturn(task); assertThat(taskManager.list()).isEmpty(); testee.reIndex(namespace, user, name); - verify(reIndexer).reIndex(new MailboxPath(namespace, Username.of(user), name)); + verify(reIndexer).reIndex(new MailboxPath(namespace, Username.of(user), name), RunningOptions.DEFAULT); assertThat(taskManager.list()).hasSize(1); } @@ -68,11 +68,11 @@ public class ReIndexerManagementTest { void reIndexShouldWaitsForExecution() throws Exception { Task task = mock(Task.class); doReturn(Task.Result.COMPLETED).when(task).run(); - when(reIndexer.reIndex()).thenReturn(task); + when(reIndexer.reIndex(any(RunningOptions.class))).thenReturn(task); assertThat(taskManager.list()).isEmpty(); testee.reIndex(); - verify(reIndexer).reIndex(); + verify(reIndexer).reIndex(RunningOptions.DEFAULT); assertThat(taskManager.list()).hasSize(1); } } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org