This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch postgresql in repository https://gitbox.apache.org/repos/asf/james-project.git
commit ef3003a756147756bf7e45fc66745893286c2123 Author: Tung Tran <[email protected]> AuthorDate: Mon Mar 18 18:05:33 2024 +0700 JAMES-2586 Avoid sorting PG messages --- .../james/mailbox/postgres/mail/AttachmentLoader.java | 18 ++++++++---------- .../mailbox/postgres/mail/PostgresMessageIdMapper.java | 9 ++++----- .../mailbox/postgres/mail/PostgresMessageMapper.java | 8 +++----- 3 files changed, 15 insertions(+), 20 deletions(-) diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/AttachmentLoader.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/AttachmentLoader.java index 874d463c2c..4927867ce4 100644 --- a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/AttachmentLoader.java +++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/AttachmentLoader.java @@ -31,7 +31,6 @@ import org.apache.james.mailbox.model.MessageAttachmentMetadata; import org.apache.james.mailbox.postgres.mail.dto.AttachmentsDTO; import org.apache.james.mailbox.store.mail.MessageMapper; import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage; -import org.apache.james.util.ReactorUtils; import org.jooq.Record; import com.google.common.collect.ImmutableMap; @@ -49,19 +48,18 @@ public class AttachmentLoader { public Flux<Pair<SimpleMailboxMessage.Builder, Record>> addAttachmentToMessage(Flux<Pair<SimpleMailboxMessage.Builder, Record>> findMessagePublisher, MessageMapper.FetchType fetchType) { - return findMessagePublisher.flatMap(pair -> { - if (fetchType == MessageMapper.FetchType.FULL || fetchType == MessageMapper.FetchType.ATTACHMENTS_METADATA) { - return Mono.fromCallable(() -> pair.getRight().get(ATTACHMENT_METADATA)) - .map(e -> toMap((AttachmentsDTO) e)) + if (fetchType != MessageMapper.FetchType.FULL && fetchType != MessageMapper.FetchType.ATTACHMENTS_METADATA) { + return findMessagePublisher; + } + + return findMessagePublisher.collectList() // convert to list to avoid hanging the database connection with Jooq + .flatMapMany(list -> Flux.fromIterable(list) + .flatMapSequential(pair -> Mono.fromCallable(() -> toMap(pair.getRight().get(ATTACHMENT_METADATA))) .flatMap(this::getAttachments) .map(messageAttachmentMetadata -> { pair.getLeft().addAttachments(messageAttachmentMetadata); return pair; - }).switchIfEmpty(Mono.just(pair)); - } else { - return Mono.just(pair); - } - }, ReactorUtils.DEFAULT_CONCURRENCY); + }).switchIfEmpty(Mono.just(pair)))); } private Map<AttachmentId, MessageRepresentation.AttachmentRepresentation> toMap(AttachmentsDTO attachmentRepresentations) { diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageIdMapper.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageIdMapper.java index e9df32ae4a..01b7304eda 100644 --- a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageIdMapper.java +++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageIdMapper.java @@ -28,7 +28,6 @@ import java.io.IOException; import java.io.InputStream; import java.time.Clock; import java.util.Collection; -import java.util.Comparator; import java.util.Date; import java.util.List; import java.util.function.Function; @@ -137,17 +136,17 @@ public class PostgresMessageIdMapper implements MessageIdMapper { @Override public Flux<MailboxMessage> findReactive(Collection<MessageId> messageIds, MessageMapper.FetchType fetchType) { - Flux<Pair<SimpleMailboxMessage.Builder, Record>> fetchMessageWithoutFullContentPublisher = mailboxMessageDAO.findMessagesByMessageIds(messageIds.stream().map(PostgresMessageId.class::cast).collect(ImmutableList.toImmutableList()), fetchType); - Flux<Pair<SimpleMailboxMessage.Builder, Record>> fetchMessagePublisher = attachmentLoader.addAttachmentToMessage(fetchMessageWithoutFullContentPublisher, fetchType); + Flux<Pair<SimpleMailboxMessage.Builder, Record>> fetchMessagePublisher = + mailboxMessageDAO.findMessagesByMessageIds(messageIds.stream().map(PostgresMessageId.class::cast).collect(ImmutableList.toImmutableList()), fetchType) + .transform(pairFlux -> attachmentLoader.addAttachmentToMessage(pairFlux, fetchType)); if (fetchType == MessageMapper.FetchType.FULL) { return fetchMessagePublisher - .flatMap(messageBuilderAndRecord -> { + .flatMapSequential(messageBuilderAndRecord -> { SimpleMailboxMessage.Builder messageBuilder = messageBuilderAndRecord.getLeft(); return retrieveFullContent(messageBuilderAndRecord.getRight()) .map(headerAndBodyContent -> messageBuilder.content(headerAndBodyContent).build()); }, ReactorUtils.DEFAULT_CONCURRENCY) - .sort(Comparator.comparing(MailboxMessage::getUid)) .map(message -> message); } else { return fetchMessagePublisher diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java index 324c244a38..ef9bb0afe9 100644 --- a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java +++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java @@ -27,7 +27,6 @@ import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.Messa import java.io.IOException; import java.io.InputStream; import java.time.Clock; -import java.util.Comparator; import java.util.Date; import java.util.Iterator; import java.util.List; @@ -136,17 +135,16 @@ public class PostgresMessageMapper implements MessageMapper { @Override public Flux<MailboxMessage> findInMailboxReactive(Mailbox mailbox, MessageRange messageRange, FetchType fetchType, int limitAsInt) { - Flux<Pair<SimpleMailboxMessage.Builder, Record>> fetchMessageWithoutFullContentPublisher = fetchMessageWithoutFullContent(mailbox, messageRange, fetchType, limitAsInt); - Flux<Pair<SimpleMailboxMessage.Builder, Record>> fetchMessagePublisher = attachmentLoader.addAttachmentToMessage(fetchMessageWithoutFullContentPublisher, fetchType); + Flux<Pair<SimpleMailboxMessage.Builder, Record>> fetchMessagePublisher = fetchMessageWithoutFullContent(mailbox, messageRange, fetchType, limitAsInt) + .transform(pairFlux -> attachmentLoader.addAttachmentToMessage(pairFlux, fetchType)); if (fetchType == FetchType.FULL) { return fetchMessagePublisher - .flatMap(messageBuilderAndRecord -> { + .flatMapSequential(messageBuilderAndRecord -> { SimpleMailboxMessage.Builder messageBuilder = messageBuilderAndRecord.getLeft(); return retrieveFullContent(messageBuilderAndRecord.getRight()) .map(headerAndBodyContent -> messageBuilder.content(headerAndBodyContent).build()); }, ReactorUtils.DEFAULT_CONCURRENCY) - .sort(Comparator.comparing(MailboxMessage::getUid)) .map(message -> message); } else { return fetchMessagePublisher --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
