This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit c85bc23230dc182ad85ede5b132c5474a077d161
Author: Benoit TELLIER <[email protected]>
AuthorDate: Mon Dec 1 15:33:44 2025 +0100

    JAMES-3340 Refactor emailQuery view code
    
     - Factory method for hiding collapseThread
     - Mutualize initial fetching and iterations
     - Mutualize backend fetch limit computation
     - Mutualize limit existense checks
    
     This result in a massive simplification of the
     DAO objects and translate into less code written.
---
 .../projections/CassandraEmailQueryView.java       | 100 +++++--------
 .../projections/PostgresEmailQueryViewDAO.java     | 159 +++++++--------------
 .../jmap/api/projections/EmailQueryViewUtils.java  | 106 ++++++++------
 3 files changed, 141 insertions(+), 224 deletions(-)

diff --git 
a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraEmailQueryView.java
 
b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraEmailQueryView.java
index f75b39f3f0..1a085d3912 100644
--- 
a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraEmailQueryView.java
+++ 
b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraEmailQueryView.java
@@ -24,8 +24,6 @@ import static 
com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker;
 import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom;
 import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.insertInto;
 import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom;
-import static 
org.apache.james.jmap.api.projections.EmailQueryViewUtils.backendLimitFetch;
-import static 
org.apache.james.jmap.api.projections.EmailQueryViewUtils.messagesWithMaybeCollapseThreads;
 import static 
org.apache.james.jmap.cassandra.projections.table.CassandraEmailQueryViewTable.DATE_LOOKUP_TABLE;
 import static 
org.apache.james.jmap.cassandra.projections.table.CassandraEmailQueryViewTable.MAILBOX_ID;
 import static 
org.apache.james.jmap.cassandra.projections.table.CassandraEmailQueryViewTable.MESSAGE_ID;
@@ -45,6 +43,7 @@ import jakarta.inject.Inject;
 
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
 import org.apache.james.jmap.api.projections.EmailQueryView;
+import org.apache.james.jmap.api.projections.EmailQueryViewUtils;
 import org.apache.james.jmap.api.projections.EmailQueryViewUtils.EmailEntry;
 import 
org.apache.james.jmap.cassandra.projections.table.CassandraEmailQueryViewTable;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
@@ -187,38 +186,24 @@ public class CassandraEmailQueryView implements 
EmailQueryView {
 
     @Override
     public Flux<MessageId> listMailboxContentSortedBySentAt(MailboxId 
mailboxId, Limit limit, boolean collapseThreads) {
-        Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be 
defined");
-
-        Limit backendFetchLimit = backendLimitFetch(limit, collapseThreads);
-        return listMailboxContentSortedBySentAtWithBackendLimit((CassandraId) 
mailboxId, limit, collapseThreads, backendFetchLimit);
-    }
-
-    private Flux<MessageId> 
listMailboxContentSortedBySentAtWithBackendLimit(CassandraId mailboxId, Limit 
limit, boolean collapseThreads, Limit backendFetchLimit) {
-        Flux<EmailEntry> baseEntries = 
executor.executeRows(listMailboxContentBySentAt.bind()
-            .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.UUID)
-            .setInt(LIMIT_MARKER, backendFetchLimit.getLimit().get()))
-            .map(asEmailEntry(SENT_AT));
+        CassandraId cassandraId = (CassandraId) mailboxId;
 
-        return messagesWithMaybeCollapseThreads(limit, backendFetchLimit, 
baseEntries, collapseThreads,
-            newLimit -> 
listMailboxContentSortedBySentAtWithBackendLimit(mailboxId, limit, 
collapseThreads, newLimit));
+        return EmailQueryViewUtils.QueryViewExtender.of(limit, collapseThreads)
+            .resolve(backendFetchLimit -> 
executor.executeRows(listMailboxContentBySentAt.bind()
+                    .set(MAILBOX_ID, cassandraId.asUuid(), TypeCodecs.UUID)
+                    .setInt(LIMIT_MARKER, backendFetchLimit.getLimit().get()))
+                .map(asEmailEntry(SENT_AT)));
     }
 
     @Override
     public Flux<MessageId> listMailboxContentSortedByReceivedAt(MailboxId 
mailboxId, Limit limit, boolean collapseThreads) {
-        Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be 
defined");
-
-        Limit backendFetchLimit = backendLimitFetch(limit, collapseThreads);
-        return 
listMailboxContentSortedByReceivedAtWithBackendLimit((CassandraId) mailboxId, 
limit, collapseThreads, backendFetchLimit);
-    }
-
-    private Flux<MessageId> 
listMailboxContentSortedByReceivedAtWithBackendLimit(CassandraId mailboxId, 
Limit limit, boolean collapseThreads, Limit backendFetchLimit) {
-        Flux<EmailEntry> baseEntries = 
executor.executeRows(listMailboxContentByReceivedAt.bind()
-                .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.UUID)
-                .setInt(LIMIT_MARKER, backendFetchLimit.getLimit().get()))
-            .map(asEmailEntry(RECEIVED_AT));
+        CassandraId cassandraId = (CassandraId) mailboxId;
 
-        return messagesWithMaybeCollapseThreads(limit, backendFetchLimit, 
baseEntries, collapseThreads,
-            newLimit -> 
listMailboxContentSortedByReceivedAtWithBackendLimit(mailboxId, limit, 
collapseThreads, newLimit));
+        return EmailQueryViewUtils.QueryViewExtender.of(limit, collapseThreads)
+            .resolve(backendFetchLimit -> 
executor.executeRows(listMailboxContentByReceivedAt.bind()
+                    .set(MAILBOX_ID, cassandraId.asUuid(), TypeCodecs.UUID)
+                    .setInt(LIMIT_MARKER, backendFetchLimit.getLimit().get()))
+                .map(asEmailEntry(RECEIVED_AT)));
     }
 
     @Override
@@ -247,59 +232,38 @@ public class CassandraEmailQueryView implements 
EmailQueryView {
 
     @Override
     public Flux<MessageId> 
listMailboxContentSinceAfterSortedByReceivedAt(MailboxId mailboxId, 
ZonedDateTime since, Limit limit, boolean collapseThreads) {
-        Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be 
defined");
-
-        Limit backendFetchLimit = backendLimitFetch(limit, collapseThreads);
-        return 
listMailboxContentSinceAfterSortedByReceivedAtWithBackendLimit((CassandraId) 
mailboxId, since, limit, collapseThreads, backendFetchLimit);
-    }
-
-    private Flux<MessageId> 
listMailboxContentSinceAfterSortedByReceivedAtWithBackendLimit(CassandraId 
mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads, Limit 
backendFetchLimit) {
-        Flux<EmailEntry> baseEntries = 
executor.executeRows(listMailboxContentSinceReceivedAt.bind()
-                .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.UUID)
-                .setInstant(RECEIVED_AT, since.toInstant())
-                .setInt(LIMIT_MARKER, backendFetchLimit.getLimit().get()))
-            .map(asEmailEntry(SENT_AT));
+        CassandraId cassandraId = (CassandraId) mailboxId;
 
-        return messagesWithMaybeCollapseThreads(limit, backendFetchLimit, 
baseEntries, collapseThreads,
-            newLimit -> 
listMailboxContentSinceAfterSortedByReceivedAtWithBackendLimit(mailboxId, 
since, limit, collapseThreads, newLimit));
+        return EmailQueryViewUtils.QueryViewExtender.of(limit, collapseThreads)
+            .resolve(backendFetchLimit -> 
executor.executeRows(listMailboxContentSinceReceivedAt.bind()
+                    .set(MAILBOX_ID, cassandraId.asUuid(), TypeCodecs.UUID)
+                    .setInstant(RECEIVED_AT, since.toInstant())
+                    .setInt(LIMIT_MARKER, backendFetchLimit.getLimit().get()))
+                .map(asEmailEntry(SENT_AT)));
     }
 
     @Override
     public Flux<MessageId> 
listMailboxContentBeforeSortedByReceivedAt(MailboxId mailboxId, ZonedDateTime 
since, Limit limit, boolean collapseThreads) {
-        Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be 
defined");
-
-        Limit backendFetchLimit = backendLimitFetch(limit, collapseThreads);
-        return 
listMailboxContentBeforeSortedByReceivedAtWithBackendLimit((CassandraId) 
mailboxId, since, limit, collapseThreads, backendFetchLimit);
-    }
-
-    private Flux<MessageId> 
listMailboxContentBeforeSortedByReceivedAtWithBackendLimit(CassandraId 
mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads, Limit 
backendFetchLimit) {
-        Flux<EmailEntry> baseEntries = 
executor.executeRows(listMailboxContentBeforeReceivedAt.bind()
-                .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.UUID)
-                .setInstant(RECEIVED_AT, since.toInstant())
-                .setInt(LIMIT_MARKER, backendFetchLimit.getLimit().get()))
-            .map(asEmailEntry(SENT_AT));
+        CassandraId cassandraId = (CassandraId) mailboxId;
 
-        return messagesWithMaybeCollapseThreads(limit, backendFetchLimit, 
baseEntries, collapseThreads,
-            newLimit -> 
listMailboxContentBeforeSortedByReceivedAtWithBackendLimit(mailboxId, since, 
limit, collapseThreads, newLimit));
+        return EmailQueryViewUtils.QueryViewExtender.of(limit, collapseThreads)
+            .resolve(backendFetchLimit -> 
executor.executeRows(listMailboxContentBeforeReceivedAt.bind()
+                        .set(MAILBOX_ID, cassandraId.asUuid(), TypeCodecs.UUID)
+                        .setInstant(RECEIVED_AT, since.toInstant())
+                        .setInt(LIMIT_MARKER, 
backendFetchLimit.getLimit().get()))
+                    .map(asEmailEntry(SENT_AT)));
     }
 
     @Override
     public Flux<MessageId> listMailboxContentSinceSentAt(MailboxId mailboxId, 
ZonedDateTime since, Limit limit, boolean collapseThreads) {
-        Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be 
defined");
-
-        Limit backendFetchLimit = backendLimitFetch(limit, collapseThreads);
-        return listMailboxContentSinceSentAtWithBackendLimit((CassandraId) 
mailboxId, since, limit, collapseThreads, backendFetchLimit);
-    }
+        CassandraId cassandraId = (CassandraId) mailboxId;
 
-    private Flux<MessageId> 
listMailboxContentSinceSentAtWithBackendLimit(CassandraId mailboxId, 
ZonedDateTime since, Limit limit, boolean collapseThreads, Limit 
backendFetchLimit) {
-        Flux<EmailEntry> baseEntries = 
executor.executeRows(listMailboxContentSinceSentAt.bind()
-                .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.UUID)
+        return EmailQueryViewUtils.QueryViewExtender.of(limit, collapseThreads)
+            .resolve(backendFetchLimit -> 
executor.executeRows(listMailboxContentSinceSentAt.bind()
+                .set(MAILBOX_ID, cassandraId.asUuid(), TypeCodecs.UUID)
                 .setInstant(SENT_AT, since.toInstant())
                 .setInt(LIMIT_MARKER, backendFetchLimit.getLimit().get()))
-            .map(asEmailEntry(SENT_AT));
-
-        return messagesWithMaybeCollapseThreads(limit, backendFetchLimit, 
baseEntries, collapseThreads,
-            newLimit -> 
listMailboxContentSinceSentAtWithBackendLimit(mailboxId, since, limit, 
collapseThreads, newLimit));
+            .map(asEmailEntry(SENT_AT)));
     }
 
     private Function<Row, EmailEntry> asEmailEntry(CqlIdentifier dateField) {
diff --git 
a/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/projections/PostgresEmailQueryViewDAO.java
 
b/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/projections/PostgresEmailQueryViewDAO.java
index ce2f235b81..4074f8a6a9 100644
--- 
a/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/projections/PostgresEmailQueryViewDAO.java
+++ 
b/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/projections/PostgresEmailQueryViewDAO.java
@@ -19,8 +19,6 @@
 
 package org.apache.james.jmap.postgres.projections;
 
-import static 
org.apache.james.jmap.api.projections.EmailQueryViewUtils.backendLimitFetch;
-import static 
org.apache.james.jmap.api.projections.EmailQueryViewUtils.messagesWithMaybeCollapseThreads;
 import static 
org.apache.james.jmap.postgres.projections.PostgresEmailQueryViewDataDefinition.PostgresEmailQueryViewTable.MAILBOX_ID;
 import static 
org.apache.james.jmap.postgres.projections.PostgresEmailQueryViewDataDefinition.PostgresEmailQueryViewTable.MESSAGE_ID;
 import static 
org.apache.james.jmap.postgres.projections.PostgresEmailQueryViewDataDefinition.PostgresEmailQueryViewTable.PK_CONSTRAINT_NAME;
@@ -39,6 +37,7 @@ import jakarta.inject.Inject;
 import jakarta.inject.Named;
 
 import org.apache.james.backends.postgres.utils.PostgresExecutor;
+import org.apache.james.jmap.api.projections.EmailQueryViewUtils;
 import org.apache.james.jmap.api.projections.EmailQueryViewUtils.EmailEntry;
 import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.model.ThreadId;
@@ -48,13 +47,11 @@ import org.apache.james.util.streams.Limit;
 import org.jooq.Field;
 import org.jooq.Record;
 
-import com.google.common.base.Preconditions;
-
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class PostgresEmailQueryViewDAO {
-    private PostgresExecutor postgresExecutor;
+    private final PostgresExecutor postgresExecutor;
 
     @Inject
     public 
PostgresEmailQueryViewDAO(@Named(PostgresExecutor.BY_PASS_RLS_INJECT) 
PostgresExecutor postgresExecutor) {
@@ -62,127 +59,67 @@ public class PostgresEmailQueryViewDAO {
     }
 
     public Flux<MessageId> listMailboxContentSortedBySentAt(PostgresMailboxId 
mailboxId, Limit limit, boolean collapseThreads) {
-        Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be 
defined");
-
-        Limit backendFetchLimit = backendLimitFetch(limit, collapseThreads);
-
-        return listMailboxContentSortedBySentAtWithBackendLimit(mailboxId, 
limit, collapseThreads, backendFetchLimit);
-    }
-
-    private Flux<MessageId> 
listMailboxContentSortedBySentAtWithBackendLimit(PostgresMailboxId mailboxId, 
Limit limit, boolean collapseThreads, Limit backendFetchLimit) {
-        Flux<EmailEntry> baseEntries = postgresExecutor.executeRows(dslContext 
-> Flux.from(dslContext.select(MESSAGE_ID, SENT_AT, THREAD_ID)
-                .from(TABLE_NAME)
-                .where(MAILBOX_ID.eq(mailboxId.asUuid()))
-                .orderBy(SENT_AT.desc())
-                .limit(backendFetchLimit.getLimit().get())))
-            .map(asEmailEntry(SENT_AT));
-
-        return messagesWithMaybeCollapseThreads(limit, backendFetchLimit, 
baseEntries, collapseThreads,
-            newLimit -> 
listMailboxContentSortedBySentAtWithBackendLimit(mailboxId, limit, 
collapseThreads, newLimit));
+        return EmailQueryViewUtils.QueryViewExtender.of(limit, collapseThreads)
+            .resolve(backendFetchLimit -> 
postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_ID, SENT_AT, THREAD_ID)
+                    .from(TABLE_NAME)
+                    .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+                    .orderBy(SENT_AT.desc())
+                    .limit(backendFetchLimit.getLimit().get())))
+                .map(asEmailEntry(SENT_AT)));
     }
 
     public Flux<MessageId> 
listMailboxContentSortedByReceivedAt(PostgresMailboxId mailboxId, Limit limit, 
boolean collapseThreads) {
-        Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be 
defined");
-
-        Limit backendFetchLimit = backendLimitFetch(limit, collapseThreads);
-
-        return listMailboxContentSortedByReceivedAtWithBackendLimit(mailboxId, 
limit, collapseThreads, backendFetchLimit);
-    }
-
-    private Flux<MessageId> 
listMailboxContentSortedByReceivedAtWithBackendLimit(PostgresMailboxId 
mailboxId, Limit limit, boolean collapseThreads, Limit backendFetchLimit) {
-        Flux<EmailEntry> baseEntries = postgresExecutor.executeRows(dslContext 
-> Flux.from(dslContext.select(MESSAGE_ID, RECEIVED_AT, THREAD_ID)
-                .from(TABLE_NAME)
-                .where(MAILBOX_ID.eq(mailboxId.asUuid()))
-                .orderBy(RECEIVED_AT.desc())
-                .limit(backendFetchLimit.getLimit().get())))
-            .map(asEmailEntry(RECEIVED_AT));
-
-        return messagesWithMaybeCollapseThreads(limit, backendFetchLimit, 
baseEntries, collapseThreads,
-            newLimit -> 
listMailboxContentSortedByReceivedAtWithBackendLimit(mailboxId, limit, 
collapseThreads, newLimit));
+        return EmailQueryViewUtils.QueryViewExtender.of(limit, collapseThreads)
+            .resolve(backendFetchLimit -> 
postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_ID, RECEIVED_AT, THREAD_ID)
+                    .from(TABLE_NAME)
+                    .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+                    .orderBy(RECEIVED_AT.desc())
+                    .limit(backendFetchLimit.getLimit().get())))
+                .map(asEmailEntry(RECEIVED_AT)));
     }
 
     public Flux<MessageId> 
listMailboxContentSinceAfterSortedBySentAt(PostgresMailboxId mailboxId, 
ZonedDateTime since, Limit limit, boolean collapseThreads) {
-        Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be 
defined");
-
-        Limit backendFetchLimit = backendLimitFetch(limit, collapseThreads);
-
-        return 
listMailboxContentSinceAfterSortedBySentAtWithBackendLimit(mailboxId, since, 
limit, collapseThreads, backendFetchLimit);
-    }
-
-    private Flux<MessageId> 
listMailboxContentSinceAfterSortedBySentAtWithBackendLimit(PostgresMailboxId 
mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads, Limit 
backendFetchLimit) {
-        Flux<EmailEntry> baseEntries = postgresExecutor.executeRows(dslContext 
-> Flux.from(dslContext.select(MESSAGE_ID, SENT_AT, THREAD_ID)
-                .from(TABLE_NAME)
-                .where(MAILBOX_ID.eq(mailboxId.asUuid()))
-                .and(RECEIVED_AT.greaterOrEqual(since.toOffsetDateTime()))
-                .orderBy(SENT_AT.desc())
-                .limit(backendFetchLimit.getLimit().get())))
-            .map(asEmailEntry(SENT_AT));
-
-        return messagesWithMaybeCollapseThreads(limit, backendFetchLimit, 
baseEntries, collapseThreads,
-            newLimit -> 
listMailboxContentSinceAfterSortedBySentAtWithBackendLimit(mailboxId, since, 
limit, collapseThreads, newLimit));
+        return EmailQueryViewUtils.QueryViewExtender.of(limit, collapseThreads)
+            .resolve(backendFetchLimit -> 
postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_ID, SENT_AT, THREAD_ID)
+                    .from(TABLE_NAME)
+                    .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+                    .and(RECEIVED_AT.greaterOrEqual(since.toOffsetDateTime()))
+                    .orderBy(SENT_AT.desc())
+                    .limit(backendFetchLimit.getLimit().get())))
+                .map(asEmailEntry(SENT_AT)));
     }
 
     public Flux<MessageId> 
listMailboxContentSinceAfterSortedByReceivedAt(PostgresMailboxId mailboxId, 
ZonedDateTime since, Limit limit, boolean collapseThreads) {
-        Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be 
defined");
-
-        Limit backendFetchLimit = backendLimitFetch(limit, collapseThreads);
-
-        return 
listMailboxContentSinceAfterSortedByReceivedAtWithBackendLimit(mailboxId, 
since, limit, collapseThreads, backendFetchLimit);
-    }
-
-    private Flux<MessageId> 
listMailboxContentSinceAfterSortedByReceivedAtWithBackendLimit(PostgresMailboxId
 mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads, Limit 
backendFetchLimit) {
-        Flux<EmailEntry> baseEntries = postgresExecutor.executeRows(dslContext 
-> Flux.from(dslContext.select(MESSAGE_ID, RECEIVED_AT, THREAD_ID)
-                .from(TABLE_NAME)
-                .where(MAILBOX_ID.eq(mailboxId.asUuid()))
-                .and(RECEIVED_AT.greaterOrEqual(since.toOffsetDateTime()))
-                .orderBy(RECEIVED_AT.desc())
-                .limit(backendFetchLimit.getLimit().get())))
-            .map(asEmailEntry(RECEIVED_AT));
-
-        return messagesWithMaybeCollapseThreads(limit, backendFetchLimit, 
baseEntries, collapseThreads,
-            newLimit -> 
listMailboxContentSinceAfterSortedByReceivedAtWithBackendLimit(mailboxId, 
since, limit, collapseThreads, newLimit));
+        return EmailQueryViewUtils.QueryViewExtender.of(limit, collapseThreads)
+            .resolve(backendFetchLimit -> 
postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_ID, RECEIVED_AT, THREAD_ID)
+                    .from(TABLE_NAME)
+                    .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+                    .and(RECEIVED_AT.greaterOrEqual(since.toOffsetDateTime()))
+                    .orderBy(RECEIVED_AT.desc())
+                    .limit(backendFetchLimit.getLimit().get())))
+                .map(asEmailEntry(RECEIVED_AT)));
     }
 
     public Flux<MessageId> 
listMailboxContentBeforeSortedByReceivedAt(PostgresMailboxId mailboxId, 
ZonedDateTime since, Limit limit, boolean collapseThreads) {
-        Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be 
defined");
-
-        Limit backendFetchLimit = backendLimitFetch(limit, collapseThreads);
-
-        return 
listMailboxContentBeforeSortedByReceivedAtWithBackendLimit(mailboxId, since, 
limit, collapseThreads, backendFetchLimit);
-    }
-
-    private Flux<MessageId> 
listMailboxContentBeforeSortedByReceivedAtWithBackendLimit(PostgresMailboxId 
mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads, Limit 
backendFetchLimit) {
-        Flux<EmailEntry> baseEntries = postgresExecutor.executeRows(dslContext 
-> Flux.from(dslContext.select(MESSAGE_ID, RECEIVED_AT, THREAD_ID)
-                .from(TABLE_NAME)
-                .where(MAILBOX_ID.eq(mailboxId.asUuid()))
-                .and(RECEIVED_AT.lessOrEqual(since.toOffsetDateTime()))
-                .orderBy(RECEIVED_AT.desc())
-                .limit(backendFetchLimit.getLimit().get())))
-            .map(asEmailEntry(RECEIVED_AT));
-
-        return messagesWithMaybeCollapseThreads(limit, backendFetchLimit, 
baseEntries, collapseThreads,
-            newLimit -> 
listMailboxContentBeforeSortedByReceivedAtWithBackendLimit(mailboxId, since, 
limit, collapseThreads, newLimit));
+        return EmailQueryViewUtils.QueryViewExtender.of(limit, collapseThreads)
+            .resolve(backendFetchLimit -> 
postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_ID, RECEIVED_AT, THREAD_ID)
+                    .from(TABLE_NAME)
+                    .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+                    .and(RECEIVED_AT.lessOrEqual(since.toOffsetDateTime()))
+                    .orderBy(RECEIVED_AT.desc())
+                    .limit(backendFetchLimit.getLimit().get())))
+                .map(asEmailEntry(RECEIVED_AT)));
     }
 
     public Flux<MessageId> listMailboxContentSinceSentAt(PostgresMailboxId 
mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads) {
-        Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be 
defined");
-
-        Limit backendFetchLimit = backendLimitFetch(limit, collapseThreads);
-
-        return listMailboxContentSinceSentAtWithBackendLimit(mailboxId, since, 
limit, collapseThreads, backendFetchLimit);
-    }
-
-    private Flux<MessageId> 
listMailboxContentSinceSentAtWithBackendLimit(PostgresMailboxId mailboxId, 
ZonedDateTime since, Limit limit, boolean collapseThreads, Limit 
backendFetchLimit) {
-        Flux<EmailEntry> baseEntries = postgresExecutor.executeRows(dslContext 
-> Flux.from(dslContext.select(MESSAGE_ID, SENT_AT, THREAD_ID)
-                .from(TABLE_NAME)
-                .where(MAILBOX_ID.eq(mailboxId.asUuid()))
-                .and(SENT_AT.greaterOrEqual(since.toOffsetDateTime()))
-                .orderBy(SENT_AT.desc())
-                .limit(backendFetchLimit.getLimit().get())))
-            .map(asEmailEntry(SENT_AT));
-
-        return messagesWithMaybeCollapseThreads(limit, backendFetchLimit, 
baseEntries, collapseThreads,
-            newLimit -> 
listMailboxContentSinceSentAtWithBackendLimit(mailboxId, since, limit, 
collapseThreads, newLimit));
+        return EmailQueryViewUtils.QueryViewExtender.of(limit, collapseThreads)
+            .resolve(backendFetchLimit -> 
postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_ID, SENT_AT, THREAD_ID)
+                    .from(TABLE_NAME)
+                    .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+                    .and(SENT_AT.greaterOrEqual(since.toOffsetDateTime()))
+                    .orderBy(SENT_AT.desc())
+                    .limit(backendFetchLimit.getLimit().get())))
+                .map(asEmailEntry(SENT_AT)));
     }
 
     private Function<Record, EmailEntry> asEmailEntry(Field<OffsetDateTime> 
dateField) {
diff --git 
a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/projections/EmailQueryViewUtils.java
 
b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/projections/EmailQueryViewUtils.java
index f604ce68f5..3bc7342336 100644
--- 
a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/projections/EmailQueryViewUtils.java
+++ 
b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/projections/EmailQueryViewUtils.java
@@ -29,6 +29,7 @@ import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.model.ThreadId;
 import org.apache.james.util.streams.Limit;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
 import reactor.core.publisher.Flux;
@@ -77,56 +78,71 @@ public class EmailQueryViewUtils {
         }
     }
 
-    public static Limit backendLimitFetch(Limit limit, boolean 
collapseThreads) {
-        if (collapseThreads) {
-            return Limit.limit(limit.getLimit().get() * 
COLLAPSE_THREADS_LIMIT_MULTIPLIER);
+    public interface QueryViewExtender {
+        static QueryViewExtender of(Limit initialLimit, boolean 
collapseThread) {
+            Preconditions.checkArgument(!initialLimit.isUnlimited(), "Limit 
should be defined");
+            if (!collapseThread) {
+                return new NoThreadCollapsing(initialLimit);
+            }
+            Limit backendFetchLimit = 
Limit.limit(initialLimit.getLimit().get() * COLLAPSE_THREADS_LIMIT_MULTIPLIER);
+            return new WithTreadCollapsing(initialLimit, backendFetchLimit);
         }
-        return limit;
-    }
 
-    public static Flux<MessageId> messagesWithCollapseThreads(Limit limit, 
Limit backendFetchLimit, Flux<EmailEntry> baseEntries, Function<Limit, 
Flux<MessageId>> listMessagesCallbackFunction) {
-        return baseEntries.collectList()
-            .flatMapMany(results -> {
-                List<EmailEntry> distinctByThreadId = 
distinctByThreadId(results);
-                boolean hasEnoughResults = distinctByThreadId.size() >= 
limit.getLimit().get();
-                boolean isExhaustive = results.size() < 
backendFetchLimit.getLimit().get();
-                if (hasEnoughResults || isExhaustive) {
-                    return Flux.fromIterable(distinctByThreadId)
-                        .take(limit.getLimit().get())
-                        .map(EmailEntry::getMessageId);
-                }
-                Limit newBackendFetchLimit = 
Limit.from(backendFetchLimit.getLimit().get() * 
COLLAPSE_THREADS_LIMIT_MULTIPLIER);
-                return 
listMessagesCallbackFunction.apply(newBackendFetchLimit);
-            });
-    }
+        Flux<MessageId> resolve(Function<Limit, Flux<EmailEntry>> 
fetchMoreResults);
 
-    public static Flux<MessageId> messagesWithMaybeCollapseThreads(Limit 
limit, Limit backendFetchLimit, Flux<EmailEntry> baseEntries, boolean 
collapseThreads, Function<Limit, Flux<MessageId>> listMessagesCallbackFunction) 
{
-        if (collapseThreads) {
-            return baseEntries.collectList()
-                .flatMapMany(results -> {
-                    List<EmailEntry> distinctByThreadId = 
distinctByThreadId(results);
-                    boolean hasEnoughResults = distinctByThreadId.size() >= 
limit.getLimit().get();
-                    boolean isExhaustive = results.size() < 
backendFetchLimit.getLimit().get();
-                    if (hasEnoughResults || isExhaustive) {
-                        return Flux.fromIterable(distinctByThreadId)
-                            .take(limit.getLimit().get())
-                            .map(EmailEntry::getMessageId);
-                    }
-                    Limit newBackendFetchLimit = 
Limit.from(backendFetchLimit.getLimit().get() * 
COLLAPSE_THREADS_LIMIT_MULTIPLIER);
-                    return 
listMessagesCallbackFunction.apply(newBackendFetchLimit);
-                });
+        class NoThreadCollapsing implements QueryViewExtender {
+            private final Limit limit;
+
+            public NoThreadCollapsing(Limit limit) {
+                this.limit = limit;
+            }
+
+            @Override
+            public Flux<MessageId> resolve(Function<Limit, Flux<EmailEntry>> 
fetchMoreResults) {
+                return 
fetchMoreResults.apply(limit).map(EmailEntry::getMessageId);
+            }
         }
-        return baseEntries.map(EmailEntry::getMessageId);
-    }
 
-    private static List<EmailEntry> distinctByThreadId(List<EmailEntry> 
emailEntries) {
-        ImmutableList.Builder<EmailEntry> list = ImmutableList.builder();
-        HashSet<ThreadId> threadIdHashSet = new HashSet<>();
-        emailEntries.forEach(emailEntry -> {
-            if (threadIdHashSet.add(emailEntry.getThreadId())) {
-                list.add(emailEntry);
+        class WithTreadCollapsing implements QueryViewExtender {
+            private final Limit initialLimit;
+            private final Limit backendFetchLimit;
+
+            private WithTreadCollapsing(Limit initialLimit, Limit 
backendFetchLimit) {
+                this.initialLimit = initialLimit;
+                this.backendFetchLimit = backendFetchLimit;
+            }
+
+            @Override
+            public Flux<MessageId> resolve(Function<Limit, Flux<EmailEntry>> 
fetchMoreResults) {
+                return fetchMoreResults.apply(backendFetchLimit)
+                    .collectList()
+                    .flatMapMany(results -> {
+                        List<EmailEntry> distinctByThreadId = 
distinctByThreadId(results);
+                        boolean hasEnoughResults = distinctByThreadId.size() 
>= initialLimit.getLimit().get();
+                        boolean isExhaustive = results.size() < 
backendFetchLimit.getLimit().get();
+                        if (hasEnoughResults || isExhaustive) {
+                            return Flux.fromIterable(distinctByThreadId)
+                                .take(initialLimit.getLimit().get())
+                                .map(EmailEntry::getMessageId);
+                        }
+                        return 
increaseBackendetchLimit().resolve(fetchMoreResults);
+                    });
+            }
+
+            private WithTreadCollapsing increaseBackendetchLimit() {
+                return new WithTreadCollapsing(initialLimit, 
Limit.from(backendFetchLimit.getLimit().get() * 
COLLAPSE_THREADS_LIMIT_MULTIPLIER));
             }
-        });
-        return list.build();
+
+            private List<EmailEntry> distinctByThreadId(List<EmailEntry> 
emailEntries) {
+                ImmutableList.Builder<EmailEntry> list = 
ImmutableList.builder();
+                HashSet<ThreadId> threadIdHashSet = new HashSet<>();
+                emailEntries.forEach(emailEntry -> {
+                    if (threadIdHashSet.add(emailEntry.getThreadId())) {
+                        list.add(emailEntry);
+                    }
+                });
+                return list.build();
+            }
+        }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to