This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit c85bc23230dc182ad85ede5b132c5474a077d161 Author: Benoit TELLIER <[email protected]> AuthorDate: Mon Dec 1 15:33:44 2025 +0100 JAMES-3340 Refactor emailQuery view code - Factory method for hiding collapseThread - Mutualize initial fetching and iterations - Mutualize backend fetch limit computation - Mutualize limit existense checks This result in a massive simplification of the DAO objects and translate into less code written. --- .../projections/CassandraEmailQueryView.java | 100 +++++-------- .../projections/PostgresEmailQueryViewDAO.java | 159 +++++++-------------- .../jmap/api/projections/EmailQueryViewUtils.java | 106 ++++++++------ 3 files changed, 141 insertions(+), 224 deletions(-) diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraEmailQueryView.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraEmailQueryView.java index f75b39f3f0..1a085d3912 100644 --- a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraEmailQueryView.java +++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraEmailQueryView.java @@ -24,8 +24,6 @@ import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker; import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom; import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.insertInto; import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom; -import static org.apache.james.jmap.api.projections.EmailQueryViewUtils.backendLimitFetch; -import static org.apache.james.jmap.api.projections.EmailQueryViewUtils.messagesWithMaybeCollapseThreads; import static org.apache.james.jmap.cassandra.projections.table.CassandraEmailQueryViewTable.DATE_LOOKUP_TABLE; import static org.apache.james.jmap.cassandra.projections.table.CassandraEmailQueryViewTable.MAILBOX_ID; import static org.apache.james.jmap.cassandra.projections.table.CassandraEmailQueryViewTable.MESSAGE_ID; @@ -45,6 +43,7 @@ import jakarta.inject.Inject; import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; import org.apache.james.jmap.api.projections.EmailQueryView; +import org.apache.james.jmap.api.projections.EmailQueryViewUtils; import org.apache.james.jmap.api.projections.EmailQueryViewUtils.EmailEntry; import org.apache.james.jmap.cassandra.projections.table.CassandraEmailQueryViewTable; import org.apache.james.mailbox.cassandra.ids.CassandraId; @@ -187,38 +186,24 @@ public class CassandraEmailQueryView implements EmailQueryView { @Override public Flux<MessageId> listMailboxContentSortedBySentAt(MailboxId mailboxId, Limit limit, boolean collapseThreads) { - Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be defined"); - - Limit backendFetchLimit = backendLimitFetch(limit, collapseThreads); - return listMailboxContentSortedBySentAtWithBackendLimit((CassandraId) mailboxId, limit, collapseThreads, backendFetchLimit); - } - - private Flux<MessageId> listMailboxContentSortedBySentAtWithBackendLimit(CassandraId mailboxId, Limit limit, boolean collapseThreads, Limit backendFetchLimit) { - Flux<EmailEntry> baseEntries = executor.executeRows(listMailboxContentBySentAt.bind() - .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.UUID) - .setInt(LIMIT_MARKER, backendFetchLimit.getLimit().get())) - .map(asEmailEntry(SENT_AT)); + CassandraId cassandraId = (CassandraId) mailboxId; - return messagesWithMaybeCollapseThreads(limit, backendFetchLimit, baseEntries, collapseThreads, - newLimit -> listMailboxContentSortedBySentAtWithBackendLimit(mailboxId, limit, collapseThreads, newLimit)); + return EmailQueryViewUtils.QueryViewExtender.of(limit, collapseThreads) + .resolve(backendFetchLimit -> executor.executeRows(listMailboxContentBySentAt.bind() + .set(MAILBOX_ID, cassandraId.asUuid(), TypeCodecs.UUID) + .setInt(LIMIT_MARKER, backendFetchLimit.getLimit().get())) + .map(asEmailEntry(SENT_AT))); } @Override public Flux<MessageId> listMailboxContentSortedByReceivedAt(MailboxId mailboxId, Limit limit, boolean collapseThreads) { - Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be defined"); - - Limit backendFetchLimit = backendLimitFetch(limit, collapseThreads); - return listMailboxContentSortedByReceivedAtWithBackendLimit((CassandraId) mailboxId, limit, collapseThreads, backendFetchLimit); - } - - private Flux<MessageId> listMailboxContentSortedByReceivedAtWithBackendLimit(CassandraId mailboxId, Limit limit, boolean collapseThreads, Limit backendFetchLimit) { - Flux<EmailEntry> baseEntries = executor.executeRows(listMailboxContentByReceivedAt.bind() - .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.UUID) - .setInt(LIMIT_MARKER, backendFetchLimit.getLimit().get())) - .map(asEmailEntry(RECEIVED_AT)); + CassandraId cassandraId = (CassandraId) mailboxId; - return messagesWithMaybeCollapseThreads(limit, backendFetchLimit, baseEntries, collapseThreads, - newLimit -> listMailboxContentSortedByReceivedAtWithBackendLimit(mailboxId, limit, collapseThreads, newLimit)); + return EmailQueryViewUtils.QueryViewExtender.of(limit, collapseThreads) + .resolve(backendFetchLimit -> executor.executeRows(listMailboxContentByReceivedAt.bind() + .set(MAILBOX_ID, cassandraId.asUuid(), TypeCodecs.UUID) + .setInt(LIMIT_MARKER, backendFetchLimit.getLimit().get())) + .map(asEmailEntry(RECEIVED_AT))); } @Override @@ -247,59 +232,38 @@ public class CassandraEmailQueryView implements EmailQueryView { @Override public Flux<MessageId> listMailboxContentSinceAfterSortedByReceivedAt(MailboxId mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads) { - Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be defined"); - - Limit backendFetchLimit = backendLimitFetch(limit, collapseThreads); - return listMailboxContentSinceAfterSortedByReceivedAtWithBackendLimit((CassandraId) mailboxId, since, limit, collapseThreads, backendFetchLimit); - } - - private Flux<MessageId> listMailboxContentSinceAfterSortedByReceivedAtWithBackendLimit(CassandraId mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads, Limit backendFetchLimit) { - Flux<EmailEntry> baseEntries = executor.executeRows(listMailboxContentSinceReceivedAt.bind() - .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.UUID) - .setInstant(RECEIVED_AT, since.toInstant()) - .setInt(LIMIT_MARKER, backendFetchLimit.getLimit().get())) - .map(asEmailEntry(SENT_AT)); + CassandraId cassandraId = (CassandraId) mailboxId; - return messagesWithMaybeCollapseThreads(limit, backendFetchLimit, baseEntries, collapseThreads, - newLimit -> listMailboxContentSinceAfterSortedByReceivedAtWithBackendLimit(mailboxId, since, limit, collapseThreads, newLimit)); + return EmailQueryViewUtils.QueryViewExtender.of(limit, collapseThreads) + .resolve(backendFetchLimit -> executor.executeRows(listMailboxContentSinceReceivedAt.bind() + .set(MAILBOX_ID, cassandraId.asUuid(), TypeCodecs.UUID) + .setInstant(RECEIVED_AT, since.toInstant()) + .setInt(LIMIT_MARKER, backendFetchLimit.getLimit().get())) + .map(asEmailEntry(SENT_AT))); } @Override public Flux<MessageId> listMailboxContentBeforeSortedByReceivedAt(MailboxId mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads) { - Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be defined"); - - Limit backendFetchLimit = backendLimitFetch(limit, collapseThreads); - return listMailboxContentBeforeSortedByReceivedAtWithBackendLimit((CassandraId) mailboxId, since, limit, collapseThreads, backendFetchLimit); - } - - private Flux<MessageId> listMailboxContentBeforeSortedByReceivedAtWithBackendLimit(CassandraId mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads, Limit backendFetchLimit) { - Flux<EmailEntry> baseEntries = executor.executeRows(listMailboxContentBeforeReceivedAt.bind() - .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.UUID) - .setInstant(RECEIVED_AT, since.toInstant()) - .setInt(LIMIT_MARKER, backendFetchLimit.getLimit().get())) - .map(asEmailEntry(SENT_AT)); + CassandraId cassandraId = (CassandraId) mailboxId; - return messagesWithMaybeCollapseThreads(limit, backendFetchLimit, baseEntries, collapseThreads, - newLimit -> listMailboxContentBeforeSortedByReceivedAtWithBackendLimit(mailboxId, since, limit, collapseThreads, newLimit)); + return EmailQueryViewUtils.QueryViewExtender.of(limit, collapseThreads) + .resolve(backendFetchLimit -> executor.executeRows(listMailboxContentBeforeReceivedAt.bind() + .set(MAILBOX_ID, cassandraId.asUuid(), TypeCodecs.UUID) + .setInstant(RECEIVED_AT, since.toInstant()) + .setInt(LIMIT_MARKER, backendFetchLimit.getLimit().get())) + .map(asEmailEntry(SENT_AT))); } @Override public Flux<MessageId> listMailboxContentSinceSentAt(MailboxId mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads) { - Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be defined"); - - Limit backendFetchLimit = backendLimitFetch(limit, collapseThreads); - return listMailboxContentSinceSentAtWithBackendLimit((CassandraId) mailboxId, since, limit, collapseThreads, backendFetchLimit); - } + CassandraId cassandraId = (CassandraId) mailboxId; - private Flux<MessageId> listMailboxContentSinceSentAtWithBackendLimit(CassandraId mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads, Limit backendFetchLimit) { - Flux<EmailEntry> baseEntries = executor.executeRows(listMailboxContentSinceSentAt.bind() - .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.UUID) + return EmailQueryViewUtils.QueryViewExtender.of(limit, collapseThreads) + .resolve(backendFetchLimit -> executor.executeRows(listMailboxContentSinceSentAt.bind() + .set(MAILBOX_ID, cassandraId.asUuid(), TypeCodecs.UUID) .setInstant(SENT_AT, since.toInstant()) .setInt(LIMIT_MARKER, backendFetchLimit.getLimit().get())) - .map(asEmailEntry(SENT_AT)); - - return messagesWithMaybeCollapseThreads(limit, backendFetchLimit, baseEntries, collapseThreads, - newLimit -> listMailboxContentSinceSentAtWithBackendLimit(mailboxId, since, limit, collapseThreads, newLimit)); + .map(asEmailEntry(SENT_AT))); } private Function<Row, EmailEntry> asEmailEntry(CqlIdentifier dateField) { diff --git a/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/projections/PostgresEmailQueryViewDAO.java b/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/projections/PostgresEmailQueryViewDAO.java index ce2f235b81..4074f8a6a9 100644 --- a/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/projections/PostgresEmailQueryViewDAO.java +++ b/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/projections/PostgresEmailQueryViewDAO.java @@ -19,8 +19,6 @@ package org.apache.james.jmap.postgres.projections; -import static org.apache.james.jmap.api.projections.EmailQueryViewUtils.backendLimitFetch; -import static org.apache.james.jmap.api.projections.EmailQueryViewUtils.messagesWithMaybeCollapseThreads; import static org.apache.james.jmap.postgres.projections.PostgresEmailQueryViewDataDefinition.PostgresEmailQueryViewTable.MAILBOX_ID; import static org.apache.james.jmap.postgres.projections.PostgresEmailQueryViewDataDefinition.PostgresEmailQueryViewTable.MESSAGE_ID; import static org.apache.james.jmap.postgres.projections.PostgresEmailQueryViewDataDefinition.PostgresEmailQueryViewTable.PK_CONSTRAINT_NAME; @@ -39,6 +37,7 @@ import jakarta.inject.Inject; import jakarta.inject.Named; import org.apache.james.backends.postgres.utils.PostgresExecutor; +import org.apache.james.jmap.api.projections.EmailQueryViewUtils; import org.apache.james.jmap.api.projections.EmailQueryViewUtils.EmailEntry; import org.apache.james.mailbox.model.MessageId; import org.apache.james.mailbox.model.ThreadId; @@ -48,13 +47,11 @@ import org.apache.james.util.streams.Limit; import org.jooq.Field; import org.jooq.Record; -import com.google.common.base.Preconditions; - import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class PostgresEmailQueryViewDAO { - private PostgresExecutor postgresExecutor; + private final PostgresExecutor postgresExecutor; @Inject public PostgresEmailQueryViewDAO(@Named(PostgresExecutor.BY_PASS_RLS_INJECT) PostgresExecutor postgresExecutor) { @@ -62,127 +59,67 @@ public class PostgresEmailQueryViewDAO { } public Flux<MessageId> listMailboxContentSortedBySentAt(PostgresMailboxId mailboxId, Limit limit, boolean collapseThreads) { - Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be defined"); - - Limit backendFetchLimit = backendLimitFetch(limit, collapseThreads); - - return listMailboxContentSortedBySentAtWithBackendLimit(mailboxId, limit, collapseThreads, backendFetchLimit); - } - - private Flux<MessageId> listMailboxContentSortedBySentAtWithBackendLimit(PostgresMailboxId mailboxId, Limit limit, boolean collapseThreads, Limit backendFetchLimit) { - Flux<EmailEntry> baseEntries = postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_ID, SENT_AT, THREAD_ID) - .from(TABLE_NAME) - .where(MAILBOX_ID.eq(mailboxId.asUuid())) - .orderBy(SENT_AT.desc()) - .limit(backendFetchLimit.getLimit().get()))) - .map(asEmailEntry(SENT_AT)); - - return messagesWithMaybeCollapseThreads(limit, backendFetchLimit, baseEntries, collapseThreads, - newLimit -> listMailboxContentSortedBySentAtWithBackendLimit(mailboxId, limit, collapseThreads, newLimit)); + return EmailQueryViewUtils.QueryViewExtender.of(limit, collapseThreads) + .resolve(backendFetchLimit -> postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_ID, SENT_AT, THREAD_ID) + .from(TABLE_NAME) + .where(MAILBOX_ID.eq(mailboxId.asUuid())) + .orderBy(SENT_AT.desc()) + .limit(backendFetchLimit.getLimit().get()))) + .map(asEmailEntry(SENT_AT))); } public Flux<MessageId> listMailboxContentSortedByReceivedAt(PostgresMailboxId mailboxId, Limit limit, boolean collapseThreads) { - Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be defined"); - - Limit backendFetchLimit = backendLimitFetch(limit, collapseThreads); - - return listMailboxContentSortedByReceivedAtWithBackendLimit(mailboxId, limit, collapseThreads, backendFetchLimit); - } - - private Flux<MessageId> listMailboxContentSortedByReceivedAtWithBackendLimit(PostgresMailboxId mailboxId, Limit limit, boolean collapseThreads, Limit backendFetchLimit) { - Flux<EmailEntry> baseEntries = postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_ID, RECEIVED_AT, THREAD_ID) - .from(TABLE_NAME) - .where(MAILBOX_ID.eq(mailboxId.asUuid())) - .orderBy(RECEIVED_AT.desc()) - .limit(backendFetchLimit.getLimit().get()))) - .map(asEmailEntry(RECEIVED_AT)); - - return messagesWithMaybeCollapseThreads(limit, backendFetchLimit, baseEntries, collapseThreads, - newLimit -> listMailboxContentSortedByReceivedAtWithBackendLimit(mailboxId, limit, collapseThreads, newLimit)); + return EmailQueryViewUtils.QueryViewExtender.of(limit, collapseThreads) + .resolve(backendFetchLimit -> postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_ID, RECEIVED_AT, THREAD_ID) + .from(TABLE_NAME) + .where(MAILBOX_ID.eq(mailboxId.asUuid())) + .orderBy(RECEIVED_AT.desc()) + .limit(backendFetchLimit.getLimit().get()))) + .map(asEmailEntry(RECEIVED_AT))); } public Flux<MessageId> listMailboxContentSinceAfterSortedBySentAt(PostgresMailboxId mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads) { - Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be defined"); - - Limit backendFetchLimit = backendLimitFetch(limit, collapseThreads); - - return listMailboxContentSinceAfterSortedBySentAtWithBackendLimit(mailboxId, since, limit, collapseThreads, backendFetchLimit); - } - - private Flux<MessageId> listMailboxContentSinceAfterSortedBySentAtWithBackendLimit(PostgresMailboxId mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads, Limit backendFetchLimit) { - Flux<EmailEntry> baseEntries = postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_ID, SENT_AT, THREAD_ID) - .from(TABLE_NAME) - .where(MAILBOX_ID.eq(mailboxId.asUuid())) - .and(RECEIVED_AT.greaterOrEqual(since.toOffsetDateTime())) - .orderBy(SENT_AT.desc()) - .limit(backendFetchLimit.getLimit().get()))) - .map(asEmailEntry(SENT_AT)); - - return messagesWithMaybeCollapseThreads(limit, backendFetchLimit, baseEntries, collapseThreads, - newLimit -> listMailboxContentSinceAfterSortedBySentAtWithBackendLimit(mailboxId, since, limit, collapseThreads, newLimit)); + return EmailQueryViewUtils.QueryViewExtender.of(limit, collapseThreads) + .resolve(backendFetchLimit -> postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_ID, SENT_AT, THREAD_ID) + .from(TABLE_NAME) + .where(MAILBOX_ID.eq(mailboxId.asUuid())) + .and(RECEIVED_AT.greaterOrEqual(since.toOffsetDateTime())) + .orderBy(SENT_AT.desc()) + .limit(backendFetchLimit.getLimit().get()))) + .map(asEmailEntry(SENT_AT))); } public Flux<MessageId> listMailboxContentSinceAfterSortedByReceivedAt(PostgresMailboxId mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads) { - Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be defined"); - - Limit backendFetchLimit = backendLimitFetch(limit, collapseThreads); - - return listMailboxContentSinceAfterSortedByReceivedAtWithBackendLimit(mailboxId, since, limit, collapseThreads, backendFetchLimit); - } - - private Flux<MessageId> listMailboxContentSinceAfterSortedByReceivedAtWithBackendLimit(PostgresMailboxId mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads, Limit backendFetchLimit) { - Flux<EmailEntry> baseEntries = postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_ID, RECEIVED_AT, THREAD_ID) - .from(TABLE_NAME) - .where(MAILBOX_ID.eq(mailboxId.asUuid())) - .and(RECEIVED_AT.greaterOrEqual(since.toOffsetDateTime())) - .orderBy(RECEIVED_AT.desc()) - .limit(backendFetchLimit.getLimit().get()))) - .map(asEmailEntry(RECEIVED_AT)); - - return messagesWithMaybeCollapseThreads(limit, backendFetchLimit, baseEntries, collapseThreads, - newLimit -> listMailboxContentSinceAfterSortedByReceivedAtWithBackendLimit(mailboxId, since, limit, collapseThreads, newLimit)); + return EmailQueryViewUtils.QueryViewExtender.of(limit, collapseThreads) + .resolve(backendFetchLimit -> postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_ID, RECEIVED_AT, THREAD_ID) + .from(TABLE_NAME) + .where(MAILBOX_ID.eq(mailboxId.asUuid())) + .and(RECEIVED_AT.greaterOrEqual(since.toOffsetDateTime())) + .orderBy(RECEIVED_AT.desc()) + .limit(backendFetchLimit.getLimit().get()))) + .map(asEmailEntry(RECEIVED_AT))); } public Flux<MessageId> listMailboxContentBeforeSortedByReceivedAt(PostgresMailboxId mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads) { - Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be defined"); - - Limit backendFetchLimit = backendLimitFetch(limit, collapseThreads); - - return listMailboxContentBeforeSortedByReceivedAtWithBackendLimit(mailboxId, since, limit, collapseThreads, backendFetchLimit); - } - - private Flux<MessageId> listMailboxContentBeforeSortedByReceivedAtWithBackendLimit(PostgresMailboxId mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads, Limit backendFetchLimit) { - Flux<EmailEntry> baseEntries = postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_ID, RECEIVED_AT, THREAD_ID) - .from(TABLE_NAME) - .where(MAILBOX_ID.eq(mailboxId.asUuid())) - .and(RECEIVED_AT.lessOrEqual(since.toOffsetDateTime())) - .orderBy(RECEIVED_AT.desc()) - .limit(backendFetchLimit.getLimit().get()))) - .map(asEmailEntry(RECEIVED_AT)); - - return messagesWithMaybeCollapseThreads(limit, backendFetchLimit, baseEntries, collapseThreads, - newLimit -> listMailboxContentBeforeSortedByReceivedAtWithBackendLimit(mailboxId, since, limit, collapseThreads, newLimit)); + return EmailQueryViewUtils.QueryViewExtender.of(limit, collapseThreads) + .resolve(backendFetchLimit -> postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_ID, RECEIVED_AT, THREAD_ID) + .from(TABLE_NAME) + .where(MAILBOX_ID.eq(mailboxId.asUuid())) + .and(RECEIVED_AT.lessOrEqual(since.toOffsetDateTime())) + .orderBy(RECEIVED_AT.desc()) + .limit(backendFetchLimit.getLimit().get()))) + .map(asEmailEntry(RECEIVED_AT))); } public Flux<MessageId> listMailboxContentSinceSentAt(PostgresMailboxId mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads) { - Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be defined"); - - Limit backendFetchLimit = backendLimitFetch(limit, collapseThreads); - - return listMailboxContentSinceSentAtWithBackendLimit(mailboxId, since, limit, collapseThreads, backendFetchLimit); - } - - private Flux<MessageId> listMailboxContentSinceSentAtWithBackendLimit(PostgresMailboxId mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads, Limit backendFetchLimit) { - Flux<EmailEntry> baseEntries = postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_ID, SENT_AT, THREAD_ID) - .from(TABLE_NAME) - .where(MAILBOX_ID.eq(mailboxId.asUuid())) - .and(SENT_AT.greaterOrEqual(since.toOffsetDateTime())) - .orderBy(SENT_AT.desc()) - .limit(backendFetchLimit.getLimit().get()))) - .map(asEmailEntry(SENT_AT)); - - return messagesWithMaybeCollapseThreads(limit, backendFetchLimit, baseEntries, collapseThreads, - newLimit -> listMailboxContentSinceSentAtWithBackendLimit(mailboxId, since, limit, collapseThreads, newLimit)); + return EmailQueryViewUtils.QueryViewExtender.of(limit, collapseThreads) + .resolve(backendFetchLimit -> postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_ID, SENT_AT, THREAD_ID) + .from(TABLE_NAME) + .where(MAILBOX_ID.eq(mailboxId.asUuid())) + .and(SENT_AT.greaterOrEqual(since.toOffsetDateTime())) + .orderBy(SENT_AT.desc()) + .limit(backendFetchLimit.getLimit().get()))) + .map(asEmailEntry(SENT_AT))); } private Function<Record, EmailEntry> asEmailEntry(Field<OffsetDateTime> dateField) { diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/projections/EmailQueryViewUtils.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/projections/EmailQueryViewUtils.java index f604ce68f5..3bc7342336 100644 --- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/projections/EmailQueryViewUtils.java +++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/projections/EmailQueryViewUtils.java @@ -29,6 +29,7 @@ import org.apache.james.mailbox.model.MessageId; import org.apache.james.mailbox.model.ThreadId; import org.apache.james.util.streams.Limit; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import reactor.core.publisher.Flux; @@ -77,56 +78,71 @@ public class EmailQueryViewUtils { } } - public static Limit backendLimitFetch(Limit limit, boolean collapseThreads) { - if (collapseThreads) { - return Limit.limit(limit.getLimit().get() * COLLAPSE_THREADS_LIMIT_MULTIPLIER); + public interface QueryViewExtender { + static QueryViewExtender of(Limit initialLimit, boolean collapseThread) { + Preconditions.checkArgument(!initialLimit.isUnlimited(), "Limit should be defined"); + if (!collapseThread) { + return new NoThreadCollapsing(initialLimit); + } + Limit backendFetchLimit = Limit.limit(initialLimit.getLimit().get() * COLLAPSE_THREADS_LIMIT_MULTIPLIER); + return new WithTreadCollapsing(initialLimit, backendFetchLimit); } - return limit; - } - public static Flux<MessageId> messagesWithCollapseThreads(Limit limit, Limit backendFetchLimit, Flux<EmailEntry> baseEntries, Function<Limit, Flux<MessageId>> listMessagesCallbackFunction) { - return baseEntries.collectList() - .flatMapMany(results -> { - List<EmailEntry> distinctByThreadId = distinctByThreadId(results); - boolean hasEnoughResults = distinctByThreadId.size() >= limit.getLimit().get(); - boolean isExhaustive = results.size() < backendFetchLimit.getLimit().get(); - if (hasEnoughResults || isExhaustive) { - return Flux.fromIterable(distinctByThreadId) - .take(limit.getLimit().get()) - .map(EmailEntry::getMessageId); - } - Limit newBackendFetchLimit = Limit.from(backendFetchLimit.getLimit().get() * COLLAPSE_THREADS_LIMIT_MULTIPLIER); - return listMessagesCallbackFunction.apply(newBackendFetchLimit); - }); - } + Flux<MessageId> resolve(Function<Limit, Flux<EmailEntry>> fetchMoreResults); - public static Flux<MessageId> messagesWithMaybeCollapseThreads(Limit limit, Limit backendFetchLimit, Flux<EmailEntry> baseEntries, boolean collapseThreads, Function<Limit, Flux<MessageId>> listMessagesCallbackFunction) { - if (collapseThreads) { - return baseEntries.collectList() - .flatMapMany(results -> { - List<EmailEntry> distinctByThreadId = distinctByThreadId(results); - boolean hasEnoughResults = distinctByThreadId.size() >= limit.getLimit().get(); - boolean isExhaustive = results.size() < backendFetchLimit.getLimit().get(); - if (hasEnoughResults || isExhaustive) { - return Flux.fromIterable(distinctByThreadId) - .take(limit.getLimit().get()) - .map(EmailEntry::getMessageId); - } - Limit newBackendFetchLimit = Limit.from(backendFetchLimit.getLimit().get() * COLLAPSE_THREADS_LIMIT_MULTIPLIER); - return listMessagesCallbackFunction.apply(newBackendFetchLimit); - }); + class NoThreadCollapsing implements QueryViewExtender { + private final Limit limit; + + public NoThreadCollapsing(Limit limit) { + this.limit = limit; + } + + @Override + public Flux<MessageId> resolve(Function<Limit, Flux<EmailEntry>> fetchMoreResults) { + return fetchMoreResults.apply(limit).map(EmailEntry::getMessageId); + } } - return baseEntries.map(EmailEntry::getMessageId); - } - private static List<EmailEntry> distinctByThreadId(List<EmailEntry> emailEntries) { - ImmutableList.Builder<EmailEntry> list = ImmutableList.builder(); - HashSet<ThreadId> threadIdHashSet = new HashSet<>(); - emailEntries.forEach(emailEntry -> { - if (threadIdHashSet.add(emailEntry.getThreadId())) { - list.add(emailEntry); + class WithTreadCollapsing implements QueryViewExtender { + private final Limit initialLimit; + private final Limit backendFetchLimit; + + private WithTreadCollapsing(Limit initialLimit, Limit backendFetchLimit) { + this.initialLimit = initialLimit; + this.backendFetchLimit = backendFetchLimit; + } + + @Override + public Flux<MessageId> resolve(Function<Limit, Flux<EmailEntry>> fetchMoreResults) { + return fetchMoreResults.apply(backendFetchLimit) + .collectList() + .flatMapMany(results -> { + List<EmailEntry> distinctByThreadId = distinctByThreadId(results); + boolean hasEnoughResults = distinctByThreadId.size() >= initialLimit.getLimit().get(); + boolean isExhaustive = results.size() < backendFetchLimit.getLimit().get(); + if (hasEnoughResults || isExhaustive) { + return Flux.fromIterable(distinctByThreadId) + .take(initialLimit.getLimit().get()) + .map(EmailEntry::getMessageId); + } + return increaseBackendetchLimit().resolve(fetchMoreResults); + }); + } + + private WithTreadCollapsing increaseBackendetchLimit() { + return new WithTreadCollapsing(initialLimit, Limit.from(backendFetchLimit.getLimit().get() * COLLAPSE_THREADS_LIMIT_MULTIPLIER)); } - }); - return list.build(); + + private List<EmailEntry> distinctByThreadId(List<EmailEntry> emailEntries) { + ImmutableList.Builder<EmailEntry> list = ImmutableList.builder(); + HashSet<ThreadId> threadIdHashSet = new HashSet<>(); + emailEntries.forEach(emailEntry -> { + if (threadIdHashSet.add(emailEntry.getThreadId())) { + list.add(emailEntry); + } + }); + return list.build(); + } + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
