This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit af594f294a6ed1167d91817177a43605dac24810 Author: Benoit Tellier <[email protected]> AuthorDate: Thu May 7 11:26:35 2020 +0700 JAMES-3143 SolveMessageInconsistenciesService: Delay confirmation read We don't need to perform the confirmation if there is no inconsistencies, enhencing run performances --- .../task/SolveMessageInconsistenciesService.java | 58 +++++++++++++++------- 1 file changed, 41 insertions(+), 17 deletions(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java index f0ec09b..a052681 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicLong; import javax.inject.Inject; +import org.apache.james.mailbox.MessageUid; import org.apache.james.mailbox.cassandra.ids.CassandraId; import org.apache.james.mailbox.cassandra.ids.CassandraMessageId; import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdDAO; @@ -395,8 +396,8 @@ public class SolveMessageInconsistenciesService { Mono<Task.Result> fixMessageInconsistencies(Context context) { return Flux.concat( - fixInconsistenciesInMessageId(context), - fixInconsistenciesInImapUid(context)) + fixInconsistenciesInMessageId(context), + fixInconsistenciesInImapUid(context)) .reduce(Task.Result.COMPLETED, Task::combine); } @@ -408,22 +409,40 @@ public class SolveMessageInconsistenciesService { } private Mono<Inconsistency> detectInconsistencyInImapUid(ComposedMessageIdWithMetaData message) { - return messageIdToImapUidDAO.retrieve((CassandraMessageId) message.getComposedMessageId().getMessageId(), Optional.of((CassandraId) message.getComposedMessageId().getMailboxId())) - .next() - .flatMap(this::compareWithMessageIdRecord) + return compareWithMessageIdRecord(message) .onErrorResume(error -> Mono.just(new FailedToRetrieveRecord(message))); } - private Mono<Inconsistency> compareWithMessageIdRecord(ComposedMessageIdWithMetaData upToDateMessageFromImapUid) { - return messageIdDAO.retrieve((CassandraId) upToDateMessageFromImapUid.getComposedMessageId().getMailboxId(), upToDateMessageFromImapUid.getComposedMessageId().getUid()) + private Mono<Inconsistency> compareWithMessageIdRecord(ComposedMessageIdWithMetaData messageFromImapUid) { + CassandraId mailboxId = (CassandraId) messageFromImapUid.getComposedMessageId().getMailboxId(); + MessageUid uid = messageFromImapUid.getComposedMessageId().getUid(); + CassandraMessageId messageId = (CassandraMessageId) messageFromImapUid.getComposedMessageId().getMessageId(); + + return messageIdDAO.retrieve(mailboxId, uid) .handle(publishIfPresent()) - .map(messageIdRecord -> { - if (messageIdRecord.equals(upToDateMessageFromImapUid)) { - return NO_INCONSISTENCY; + .flatMap(messageIdRecord -> { + if (messageIdRecord.equals(messageFromImapUid)) { + return Mono.just(NO_INCONSISTENCY); } - return new OutdatedMessageIdEntry(messageIdRecord, upToDateMessageFromImapUid); + return detectOutdatedMessageIdEntry(mailboxId, messageId, messageIdRecord); }) - .switchIfEmpty(Mono.just(new OrphanImapUidEntry(upToDateMessageFromImapUid))); + .switchIfEmpty( + detectOrphanImapUidEntry(messageFromImapUid, mailboxId, messageId)); + } + + private Mono<Inconsistency> detectOutdatedMessageIdEntry(CassandraId mailboxId, CassandraMessageId messageId, ComposedMessageIdWithMetaData messageIdRecord) { + return messageIdToImapUidDAO.retrieve(messageId, Optional.of(mailboxId)) + .filter(upToDateMessageFromImapUid -> !upToDateMessageFromImapUid.equals(messageIdRecord)) + .<Inconsistency>map(upToDateMessageFromImapUid -> new OutdatedMessageIdEntry(messageIdRecord, upToDateMessageFromImapUid)) + .next() + .switchIfEmpty(Mono.just(NO_INCONSISTENCY)); + } + + private Mono<Inconsistency> detectOrphanImapUidEntry(ComposedMessageIdWithMetaData messageFromImapUid, CassandraId mailboxId, CassandraMessageId messageId) { + return messageIdToImapUidDAO.retrieve(messageId, Optional.of(mailboxId)) + .next() + .<Inconsistency>map(OrphanImapUidEntry::new) + .switchIfEmpty(Mono.just(NO_INCONSISTENCY)); } private Flux<Task.Result> fixInconsistenciesInMessageId(Context context) { @@ -434,12 +453,17 @@ public class SolveMessageInconsistenciesService { } private Mono<Inconsistency> detectInconsistencyInMessageId(ComposedMessageIdWithMetaData message) { + return messageIdToImapUidDAO.retrieve((CassandraMessageId) message.getComposedMessageId().getMessageId(), Optional.of((CassandraId) message.getComposedMessageId().getMailboxId())) + .map(uidRecord -> NO_INCONSISTENCY) + .next() + .switchIfEmpty(detectOrphanMessageIdEntry(message)) + .onErrorResume(error -> Mono.just(new FailedToRetrieveRecord(message))); + } + + private Mono<Inconsistency> detectOrphanMessageIdEntry(ComposedMessageIdWithMetaData message) { return messageIdDAO.retrieve((CassandraId) message.getComposedMessageId().getMailboxId(), message.getComposedMessageId().getUid()) .handle(publishIfPresent()) - .flatMap(upToDateMessage -> messageIdToImapUidDAO.retrieve((CassandraMessageId) message.getComposedMessageId().getMessageId(), Optional.of((CassandraId) message.getComposedMessageId().getMailboxId())) - .map(uidRecord -> NO_INCONSISTENCY) - .switchIfEmpty(Mono.just(new OrphanMessageIdEntry(message))) - .next()) - .onErrorResume(error -> Mono.just(new FailedToRetrieveRecord(message))); + .<Inconsistency>map(OrphanMessageIdEntry::new) + .switchIfEmpty(Mono.just(NO_INCONSISTENCY)); } } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
