This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit cd0783e6fe67f836d2ba866f6779bfef14c01d5c Author: LanKhuat <[email protected]> AuthorDate: Mon May 18 17:29:01 2020 +0700 JAMES-3184 Throttle message process rate --- .../task/SolveMessageInconsistenciesService.java | 57 +++++++++++++---- .../SolveMessageInconsistenciesServiceTest.java | 73 +++++++++++----------- 2 files changed, 81 insertions(+), 49 deletions(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java index a052681..6106458 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java @@ -21,6 +21,7 @@ package org.apache.james.mailbox.cassandra.mail.task; import static org.apache.james.util.ReactorUtils.publishIfPresent; +import java.time.Duration; import java.util.Collection; import java.util.Objects; import java.util.Optional; @@ -41,10 +42,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; public class SolveMessageInconsistenciesService { @@ -162,6 +165,23 @@ public class SolveMessageInconsistenciesService { } } + public static class RunningOptions { + + public static final RunningOptions DEFAULT = new RunningOptions(100); + + private final int messagesPerSecond; + + public RunningOptions(int messagesPerSecond) { + Preconditions.checkArgument(messagesPerSecond > 0, "'messagesPerSecond' must be strictly positive"); + + this.messagesPerSecond = messagesPerSecond; + } + + public int getMessagesPerSecond() { + return this.messagesPerSecond; + } + } + static class Context { static class Snapshot { public static Builder builder() { @@ -332,8 +352,8 @@ public class SolveMessageInconsistenciesService { } private Context(AtomicLong processedImapUidEntries, AtomicLong processedMessageIdEntries, AtomicLong addedMessageIdEntries, - AtomicLong updatedMessageIdEntries, AtomicLong removedMessageIdEntries, Collection<ComposedMessageId> fixedInconsistencies, - Collection<ComposedMessageId> errors) { + AtomicLong updatedMessageIdEntries, AtomicLong removedMessageIdEntries, + Collection<ComposedMessageId> fixedInconsistencies, Collection<ComposedMessageId> errors) { this.processedImapUidEntries = processedImapUidEntries; this.processedMessageIdEntries = processedMessageIdEntries; this.addedMessageIdEntries = addedMessageIdEntries; @@ -383,7 +403,9 @@ public class SolveMessageInconsistenciesService { } } - public static final Logger LOGGER = LoggerFactory.getLogger(SolveMessageInconsistenciesService.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SolveMessageInconsistenciesService.class); + private static final Duration DELAY = Duration.ZERO; + private static final Duration PERIOD = Duration.ofSeconds(1); private final CassandraMessageIdToImapUidDAO messageIdToImapUidDAO; private final CassandraMessageIdDAO messageIdDAO; @@ -394,18 +416,24 @@ public class SolveMessageInconsistenciesService { this.messageIdDAO = messageIdDAO; } - Mono<Task.Result> fixMessageInconsistencies(Context context) { + Mono<Task.Result> fixMessageInconsistencies(Context context, RunningOptions runningOptions) { return Flux.concat( - fixInconsistenciesInMessageId(context), - fixInconsistenciesInImapUid(context)) + fixInconsistenciesInMessageId(context, runningOptions), + fixInconsistenciesInImapUid(context, runningOptions)) .reduce(Task.Result.COMPLETED, Task::combine); } - private Flux<Task.Result> fixInconsistenciesInImapUid(Context context) { - return messageIdToImapUidDAO.retrieveAllMessages() + private Flux<Task.Result> fixInconsistenciesInImapUid(Context context, RunningOptions runningOptions) { + return throttle(messageIdToImapUidDAO.retrieveAllMessages(), runningOptions) .doOnNext(any -> context.incrementProcessedImapUidEntries()) - .concatMap(this::detectInconsistencyInImapUid) - .concatMap(inconsistency -> inconsistency.fix(context, messageIdToImapUidDAO, messageIdDAO)); + .flatMap(this::detectInconsistencyInImapUid) + .flatMap(inconsistency -> inconsistency.fix(context, messageIdToImapUidDAO, messageIdDAO)); + } + + private Flux<ComposedMessageIdWithMetaData> throttle(Flux<ComposedMessageIdWithMetaData> messages, RunningOptions runningOptions) { + return messages.windowTimeout(runningOptions.getMessagesPerSecond(), Duration.ofSeconds(1)) + .zipWith(Flux.interval(DELAY, PERIOD)) + .flatMap(Tuple2::getT1); } private Mono<Inconsistency> detectInconsistencyInImapUid(ComposedMessageIdWithMetaData message) { @@ -445,11 +473,14 @@ public class SolveMessageInconsistenciesService { .switchIfEmpty(Mono.just(NO_INCONSISTENCY)); } - private Flux<Task.Result> fixInconsistenciesInMessageId(Context context) { + private Flux<Task.Result> fixInconsistenciesInMessageId(Context context, RunningOptions runningOptions) { return messageIdDAO.retrieveAllMessages() .doOnNext(any -> context.incrementMessageIdEntries()) - .concatMap(this::detectInconsistencyInMessageId) - .concatMap(inconsistency -> inconsistency.fix(context, messageIdToImapUidDAO, messageIdDAO)); + .windowTimeout(runningOptions.getMessagesPerSecond(), Duration.ofSeconds(1)) + .zipWith(Flux.interval(Duration.ofSeconds(1))) + .flatMap(Tuple2::getT1) + .flatMap(this::detectInconsistencyInMessageId) + .flatMap(inconsistency -> inconsistency.fix(context, messageIdToImapUidDAO, messageIdDAO)); } private Mono<Inconsistency> detectInconsistencyInMessageId(ComposedMessageIdWithMetaData message) { diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesServiceTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesServiceTest.java index 9b60935..086d105 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesServiceTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesServiceTest.java @@ -39,6 +39,7 @@ import org.apache.james.mailbox.cassandra.ids.CassandraMessageId; import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdDAO; import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO; import org.apache.james.mailbox.cassandra.mail.task.SolveMessageInconsistenciesService.Context; +import org.apache.james.mailbox.cassandra.mail.task.SolveMessageInconsistenciesService.RunningOptions; import org.apache.james.mailbox.cassandra.modules.CassandraMessageModule; import org.apache.james.mailbox.model.ComposedMessageId; import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData; @@ -105,7 +106,7 @@ public class SolveMessageInconsistenciesServiceTest { @Test void fixMessageInconsistenciesShouldReturnCompletedWhenNoData() { - assertThat(testee.fixMessageInconsistencies(new Context()).block()) + assertThat(testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block()) .isEqualTo(Task.Result.COMPLETED); } @@ -114,13 +115,13 @@ public class SolveMessageInconsistenciesServiceTest { imapUidDAO.insert(MESSAGE_1).block(); messageIdDAO.insert(MESSAGE_1).block(); - assertThat(testee.fixMessageInconsistencies(new Context()).block()) + assertThat(testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block()) .isEqualTo(Task.Result.COMPLETED); } @Test void fixMailboxInconsistenciesShouldNotAlterStateWhenEmpty() { - testee.fixMessageInconsistencies(new Context()).block(); + testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block(); SoftAssertions.assertSoftly(softly -> { softly.assertThat(imapUidDAO.retrieveAllMessages().collectList().block()).isEmpty(); @@ -133,7 +134,7 @@ public class SolveMessageInconsistenciesServiceTest { imapUidDAO.insert(MESSAGE_1).block(); messageIdDAO.insert(MESSAGE_1).block(); - testee.fixMessageInconsistencies(new Context()).block(); + testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block(); SoftAssertions.assertSoftly(softly -> { softly.assertThat(imapUidDAO.retrieveAllMessages().collectList().block()) @@ -150,7 +151,7 @@ public class SolveMessageInconsistenciesServiceTest { void fixMessageInconsistenciesShouldReturnCompletedWhenInconsistentData() { imapUidDAO.insert(MESSAGE_1).block(); - assertThat(testee.fixMessageInconsistencies(new Context()).block()) + assertThat(testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block()) .isEqualTo(Task.Result.COMPLETED); } @@ -167,7 +168,7 @@ public class SolveMessageInconsistenciesServiceTest { .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId AND uid=:uid;")); Context context = new Context(); - Mono<Task.Result> task = testee.fixMessageInconsistencies(context).subscribeOn(Schedulers.elastic()).cache(); + Mono<Task.Result> task = testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).subscribeOn(Schedulers.elastic()).cache(); task.subscribe(); barrier.awaitCaller(); @@ -196,7 +197,7 @@ public class SolveMessageInconsistenciesServiceTest { .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId AND uid=:uid;")); Context context = new Context(); - Mono<Task.Result> task = testee.fixMessageInconsistencies(context).subscribeOn(Schedulers.elastic()).cache(); + Mono<Task.Result> task = testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).subscribeOn(Schedulers.elastic()).cache(); task.subscribe(); barrier.awaitCaller(); @@ -217,7 +218,7 @@ public class SolveMessageInconsistenciesServiceTest { void fixMessageInconsistenciesShouldResolveInconsistentData() { imapUidDAO.insert(MESSAGE_1).block(); - testee.fixMessageInconsistencies(new Context()).block(); + testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block(); SoftAssertions.assertSoftly(softly -> { softly.assertThat(imapUidDAO.retrieve(MESSAGE_ID_1, Optional.of(MAILBOX_ID)).collectList().block()) @@ -232,7 +233,7 @@ public class SolveMessageInconsistenciesServiceTest { imapUidDAO.insert(MESSAGE_1).block(); messageIdDAO.insert(MESSAGE_1_WITH_SEEN_FLAG).block(); - assertThat(testee.fixMessageInconsistencies(new Context()).block()) + assertThat(testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block()) .isEqualTo(Task.Result.COMPLETED); } @@ -241,7 +242,7 @@ public class SolveMessageInconsistenciesServiceTest { imapUidDAO.insert(MESSAGE_1).block(); messageIdDAO.insert(MESSAGE_1_WITH_SEEN_FLAG).block(); - testee.fixMessageInconsistencies(new Context()).block(); + testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block(); SoftAssertions.assertSoftly(softly -> { softly.assertThat(imapUidDAO.retrieve(MESSAGE_ID_1, Optional.of(MAILBOX_ID)).collectList().block()) @@ -256,7 +257,7 @@ public class SolveMessageInconsistenciesServiceTest { imapUidDAO.insert(MESSAGE_1).block(); messageIdDAO.insert(MESSAGE_1_WITH_MOD_SEQ_2).block(); - assertThat(testee.fixMessageInconsistencies(new Context()).block()) + assertThat(testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block()) .isEqualTo(Task.Result.COMPLETED); } @@ -265,7 +266,7 @@ public class SolveMessageInconsistenciesServiceTest { imapUidDAO.insert(MESSAGE_1).block(); messageIdDAO.insert(MESSAGE_1_WITH_MOD_SEQ_2).block(); - testee.fixMessageInconsistencies(new Context()).block(); + testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block(); SoftAssertions.assertSoftly(softly -> { softly.assertThat(imapUidDAO.retrieve(MESSAGE_ID_1, Optional.of(MAILBOX_ID)).collectList().block()) @@ -286,7 +287,7 @@ public class SolveMessageInconsistenciesServiceTest { .forever() .whenQueryStartsWith("INSERT INTO messageIdTable (mailboxId,uid,modSeq,messageId,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags) VALUES (:mailboxId,:uid,:modSeq,:messageId,:flagAnswered,:flagDeleted,:flagDraft,:flagFlagged,:flagRecent,:flagSeen,:flagUser,:userFlags)")); - assertThat(testee.fixMessageInconsistencies(new Context()).block()) + assertThat(testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block()) .isEqualTo(Task.Result.PARTIAL); } @@ -300,7 +301,7 @@ public class SolveMessageInconsistenciesServiceTest { .times(1) .whenQueryStartsWith("INSERT INTO messageIdTable (mailboxId,uid,modSeq,messageId,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags) VALUES (:mailboxId,:uid,:modSeq,:messageId,:flagAnswered,:flagDeleted,:flagDraft,:flagFlagged,:flagRecent,:flagSeen,:flagUser,:userFlags)")); - assertThat(testee.fixMessageInconsistencies(new Context()).block()) + assertThat(testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block()) .isEqualTo(Task.Result.PARTIAL); } @@ -314,7 +315,7 @@ public class SolveMessageInconsistenciesServiceTest { .times(1) .whenQueryStartsWith("INSERT INTO messageIdTable (mailboxId,uid,modSeq,messageId,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags) VALUES (:mailboxId,:uid,:modSeq,d2bee791-7e63-11ea-883c-95b84008f979,:flagAnswered,:flagDeleted,:flagDraft,:flagFlagged,:flagRecent,:flagSeen,:flagUser,:userFlags)")); - testee.fixMessageInconsistencies(new Context()).block(); + testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block(); SoftAssertions.assertSoftly(softly -> { softly.assertThat(imapUidDAO.retrieve(MESSAGE_ID_2, Optional.of(MAILBOX_ID)).collectList().block()) @@ -325,7 +326,7 @@ public class SolveMessageInconsistenciesServiceTest { } @Test - void fixMailboxInconsistenciesShouldUpdateContextWhenFailedToRetrieveImapUidRecord(CassandraCluster cassandra) { + void fixMessageInconsistenciesShouldUpdateContextWhenFailedToRetrieveImapUidRecord(CassandraCluster cassandra) { Context context = new Context(); imapUidDAO.insert(MESSAGE_1).block(); @@ -335,7 +336,7 @@ public class SolveMessageInconsistenciesServiceTest { .times(1) .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM imapUidTable WHERE messageId=:messageId AND mailboxId=:mailboxId")); - testee.fixMessageInconsistencies(context).block(); + testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).block(); assertThat(context.snapshot()) .isEqualTo(Context.Snapshot.builder() @@ -345,7 +346,7 @@ public class SolveMessageInconsistenciesServiceTest { } @Test - void fixMailboxInconsistenciesShouldUpdateContextWhenFailedToRetrieveMessageIdRecord(CassandraCluster cassandra) { + void fixMessageInconsistenciesShouldUpdateContextWhenFailedToRetrieveMessageIdRecord(CassandraCluster cassandra) { Context context = new Context(); imapUidDAO.insert(MESSAGE_1).block(); @@ -355,7 +356,7 @@ public class SolveMessageInconsistenciesServiceTest { .times(1) .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId AND uid=:uid")); - testee.fixMessageInconsistencies(context).block(); + testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).block(); assertThat(context.snapshot()) .isEqualTo(Context.Snapshot.builder() @@ -373,7 +374,7 @@ public class SolveMessageInconsistenciesServiceTest { void fixMessageInconsistenciesShouldReturnCompletedWhenInconsistentData() { messageIdDAO.insert(MESSAGE_1).block(); - assertThat(testee.fixMessageInconsistencies(new Context()).block()) + assertThat(testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block()) .isEqualTo(Task.Result.COMPLETED); } @@ -391,7 +392,7 @@ public class SolveMessageInconsistenciesServiceTest { "WHERE mailboxId=:mailboxId AND uid=:uid;")); Context context = new Context(); - Mono<Task.Result> task = testee.fixMessageInconsistencies(context).subscribeOn(Schedulers.elastic()).cache(); + Mono<Task.Result> task = testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).subscribeOn(Schedulers.elastic()).cache(); task.subscribe(); barrier.awaitCaller(); @@ -412,7 +413,7 @@ public class SolveMessageInconsistenciesServiceTest { void fixMessageInconsistenciesShouldResolveInconsistentData() { messageIdDAO.insert(MESSAGE_1).block(); - testee.fixMessageInconsistencies(new Context()).block(); + testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block(); SoftAssertions.assertSoftly(softly -> { softly.assertThat(imapUidDAO.retrieveAllMessages().collectList().block()) @@ -429,7 +430,7 @@ public class SolveMessageInconsistenciesServiceTest { imapUidDAO.insert(MESSAGE_1).block(); - assertThat(testee.fixMessageInconsistencies(new Context()).block()) + assertThat(testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block()) .isEqualTo(Task.Result.COMPLETED); } @@ -440,7 +441,7 @@ public class SolveMessageInconsistenciesServiceTest { imapUidDAO.insert(MESSAGE_1).block(); - testee.fixMessageInconsistencies(new Context()).block(); + testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block(); SoftAssertions.assertSoftly(softly -> { softly.assertThat(imapUidDAO.retrieveAllMessages().collectList().block()) @@ -461,7 +462,7 @@ public class SolveMessageInconsistenciesServiceTest { .forever() .whenQueryStartsWith("DELETE FROM messageIdTable WHERE mailboxId=:mailboxId AND uid=:uid")); - assertThat(testee.fixMessageInconsistencies(new Context()).block()) + assertThat(testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block()) .isEqualTo(Task.Result.PARTIAL); } @@ -475,7 +476,7 @@ public class SolveMessageInconsistenciesServiceTest { .times(1) .whenQueryStartsWith("DELETE FROM messageIdTable WHERE mailboxId=:mailboxId AND uid=:uid")); - assertThat(testee.fixMessageInconsistencies(new Context()).block()) + assertThat(testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block()) .isEqualTo(Task.Result.PARTIAL); } @@ -489,7 +490,7 @@ public class SolveMessageInconsistenciesServiceTest { .times(1) .whenQueryStartsWith("DELETE FROM messageIdTable WHERE mailboxId=:mailboxId AND uid=:uid;")); - testee.fixMessageInconsistencies(new Context()).block(); + testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block(); SoftAssertions.assertSoftly(softly -> { softly.assertThat(imapUidDAO.retrieveAllMessages().collectList().block()) @@ -510,7 +511,7 @@ public class SolveMessageInconsistenciesServiceTest { .times(1) .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId AND uid=:uid")); - testee.fixMessageInconsistencies(context).block(); + testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).block(); assertThat(context.snapshot()) .isEqualTo(Context.Snapshot.builder() @@ -530,7 +531,7 @@ public class SolveMessageInconsistenciesServiceTest { .times(1) .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM imapUidTable WHERE messageId=:messageId AND mailboxId=:mailboxId")); - testee.fixMessageInconsistencies(context).block(); + testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).block(); assertThat(context.snapshot()) .isEqualTo(Context.Snapshot.builder() @@ -545,7 +546,7 @@ public class SolveMessageInconsistenciesServiceTest { void fixMailboxInconsistenciesShouldNotUpdateContextWhenNoData() { Context context = new Context(); - testee.fixMessageInconsistencies(context).block(); + testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).block(); assertThat(context.snapshot()).isEqualToComparingFieldByFieldRecursively(new Context().snapshot()); } @@ -557,7 +558,7 @@ public class SolveMessageInconsistenciesServiceTest { imapUidDAO.insert(MESSAGE_1).block(); messageIdDAO.insert(MESSAGE_1).block(); - testee.fixMessageInconsistencies(context).block(); + testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).block(); assertThat(context.snapshot()) .isEqualTo(Context.Snapshot.builder() @@ -572,7 +573,7 @@ public class SolveMessageInconsistenciesServiceTest { imapUidDAO.insert(MESSAGE_1).block(); - testee.fixMessageInconsistencies(context).block(); + testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).block(); assertThat(context.snapshot()) .isEqualTo(Context.Snapshot.builder() @@ -589,7 +590,7 @@ public class SolveMessageInconsistenciesServiceTest { imapUidDAO.insert(MESSAGE_1).block(); messageIdDAO.insert(MESSAGE_1_WITH_MOD_SEQ_2).block(); - testee.fixMessageInconsistencies(context).block(); + testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).block(); assertThat(context.snapshot()) .isEqualTo(Context.Snapshot.builder() @@ -607,7 +608,7 @@ public class SolveMessageInconsistenciesServiceTest { imapUidDAO.insert(MESSAGE_1).block(); messageIdDAO.insert(MESSAGE_1_WITH_SEEN_FLAG).block(); - testee.fixMessageInconsistencies(context).block(); + testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).block(); assertThat(context.snapshot()) .isEqualTo(Context.Snapshot.builder() @@ -624,7 +625,7 @@ public class SolveMessageInconsistenciesServiceTest { messageIdDAO.insert(MESSAGE_1).block(); - testee.fixMessageInconsistencies(context).block(); + testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).block(); assertThat(context.snapshot()) .isEqualTo(Context.Snapshot.builder() @@ -645,7 +646,7 @@ public class SolveMessageInconsistenciesServiceTest { .times(1) .whenQueryStartsWith("DELETE FROM messageIdTable WHERE mailboxId=:mailboxId AND uid=:uid;")); - testee.fixMessageInconsistencies(context).block(); + testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).block(); assertThat(context.snapshot()) .isEqualTo(Context.Snapshot.builder() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
