This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 941091f9b143b407445abfa676f0e54cf51e3970
Author: Rene Cordier <[email protected]>
AuthorDate: Tue Nov 25 11:20:16 2025 +0700

    JAMES-3340 Postgres implementation for collapseThreads handling with 
EmailQueryView
---
 .../projections/CassandraEmailQueryView.java       |  83 +----------
 .../projections/PostgresEmailQueryView.java        |  14 +-
 .../projections/PostgresEmailQueryViewDAO.java     | 152 +++++++++++++++++----
 .../jmap/api/projections/EmailQueryViewUtils.java  | 113 +++++++++++++++
 4 files changed, 251 insertions(+), 111 deletions(-)

diff --git 
a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraEmailQueryView.java
 
b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraEmailQueryView.java
index de4942a37f..4ddb6f69ab 100644
--- 
a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraEmailQueryView.java
+++ 
b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraEmailQueryView.java
@@ -24,6 +24,8 @@ import static 
com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker;
 import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom;
 import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.insertInto;
 import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom;
+import static 
org.apache.james.jmap.api.projections.EmailQueryViewUtils.backendLimitFetch;
+import static 
org.apache.james.jmap.api.projections.EmailQueryViewUtils.messagesWithCollapseThreads;
 import static 
org.apache.james.jmap.cassandra.projections.table.CassandraEmailQueryViewTable.DATE_LOOKUP_TABLE;
 import static 
org.apache.james.jmap.cassandra.projections.table.CassandraEmailQueryViewTable.MAILBOX_ID;
 import static 
org.apache.james.jmap.cassandra.projections.table.CassandraEmailQueryViewTable.MESSAGE_ID;
@@ -36,9 +38,6 @@ import static 
org.apache.james.jmap.cassandra.projections.table.CassandraEmailQu
 import java.time.Instant;
 import java.time.ZonedDateTime;
 import java.util.Comparator;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Objects;
 import java.util.UUID;
 import java.util.function.Function;
 
@@ -46,6 +45,7 @@ import jakarta.inject.Inject;
 
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
 import org.apache.james.jmap.api.projections.EmailQueryView;
+import org.apache.james.jmap.api.projections.EmailQueryViewUtils.EmailEntry;
 import 
org.apache.james.jmap.cassandra.projections.table.CassandraEmailQueryViewTable;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
 import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
@@ -61,55 +61,12 @@ import com.datastax.oss.driver.api.core.cql.Row;
 import com.datastax.oss.driver.api.core.type.codec.TypeCodecs;
 import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class CassandraEmailQueryView implements EmailQueryView {
     private static final String LIMIT_MARKER = "LIMIT_BIND_MARKER";
-    private static final int COLLAPSE_THREADS_LIMIT_MULTIPLIER = 4;
-
-    private class EmailEntry {
-        private final MessageId messageId;
-        private final ThreadId threadId;
-        private final Instant messageDate;
-
-        EmailEntry(MessageId messageId, ThreadId threadId, Instant 
messageDate) {
-            this.messageId = messageId;
-            this.threadId = threadId;
-            this.messageDate = messageDate;
-        }
-
-        public MessageId getMessageId() {
-            return messageId;
-        }
-
-        public ThreadId getThreadId() {
-            return threadId;
-        }
-
-        public Instant getMessageDate() {
-            return messageDate;
-        }
-
-        @Override
-        public final boolean equals(Object o) {
-            if (o instanceof EmailEntry) {
-                EmailEntry entry = (EmailEntry) o;
-
-                return Objects.equals(this.messageId, entry.messageId)
-                    && Objects.equals(this.threadId, entry.threadId)
-                    && Objects.equals(this.messageDate, entry.messageDate);
-            }
-            return false;
-        }
-
-        @Override
-        public final int hashCode() {
-            return Objects.hash(messageId, threadId, messageDate);
-        }
-    }
 
     private final CassandraAsyncExecutor executor;
     private final PreparedStatement listMailboxContentBySentAt;
@@ -250,33 +207,6 @@ public class CassandraEmailQueryView implements 
EmailQueryView {
         return baseEntries.map(EmailEntry::getMessageId);
     }
 
-    private Flux<MessageId> messagesWithCollapseThreads(Limit limit, Limit 
backendFetchLimit, Flux<EmailEntry> baseEntries, Function<Limit, 
Flux<MessageId>> listMessagesCallbackFunction) {
-        return baseEntries.collectList()
-            .flatMapMany(results -> {
-                List<EmailEntry> distinctByThreadId = 
distinctByThreadId(results);
-                boolean hasEnoughResults = distinctByThreadId.size() >= 
limit.getLimit().get();
-                boolean isExhaustive = results.size() < 
backendFetchLimit.getLimit().get();
-                if (hasEnoughResults || isExhaustive) {
-                    return Flux.fromIterable(distinctByThreadId)
-                        .take(limit.getLimit().get())
-                        .map(EmailEntry::getMessageId);
-                }
-                Limit newBackendFetchLimit = 
Limit.from(backendFetchLimit.getLimit().get() * 
COLLAPSE_THREADS_LIMIT_MULTIPLIER);
-                return 
listMessagesCallbackFunction.apply(newBackendFetchLimit);
-            });
-    }
-
-    private List<EmailEntry> distinctByThreadId(List<EmailEntry> emailEntries) 
{
-        ImmutableList.Builder<EmailEntry> list = ImmutableList.builder();
-        HashSet<ThreadId> threadIdHashSet = new HashSet<>();
-        emailEntries.forEach(emailEntry -> {
-            if (threadIdHashSet.add(emailEntry.getThreadId())) {
-                list.add(emailEntry);
-            }
-        });
-        return list.build();
-    }
-
     @Override
     public Flux<MessageId> listMailboxContentSortedByReceivedAt(MailboxId 
mailboxId, Limit limit, boolean collapseThreads) {
         Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be 
defined");
@@ -392,13 +322,6 @@ public class CassandraEmailQueryView implements 
EmailQueryView {
         return baseEntries.map(EmailEntry::getMessageId);
     }
 
-    private Limit backendLimitFetch(Limit limit, boolean collapseThreads) {
-        if (collapseThreads) {
-            return Limit.limit(limit.getLimit().get() * 
COLLAPSE_THREADS_LIMIT_MULTIPLIER);
-        }
-        return limit;
-    }
-
     private Function<Row, EmailEntry> asEmailEntry(CqlIdentifier dateField) {
         return (Row row) -> {
             CassandraMessageId messageId = 
CassandraMessageId.Factory.of(row.getUuid(MESSAGE_ID));
diff --git 
a/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/projections/PostgresEmailQueryView.java
 
b/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/projections/PostgresEmailQueryView.java
index 4831d046c8..5398ef41c5 100644
--- 
a/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/projections/PostgresEmailQueryView.java
+++ 
b/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/projections/PostgresEmailQueryView.java
@@ -35,7 +35,7 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class PostgresEmailQueryView implements EmailQueryView {
-    private PostgresEmailQueryViewDAO emailQueryViewDAO;
+    private final PostgresEmailQueryViewDAO emailQueryViewDAO;
 
     @Inject
     public PostgresEmailQueryView(PostgresEmailQueryViewDAO emailQueryViewDAO) 
{
@@ -44,32 +44,32 @@ public class PostgresEmailQueryView implements 
EmailQueryView {
 
     @Override
     public Flux<MessageId> listMailboxContentSortedBySentAt(MailboxId 
mailboxId, Limit limit, boolean collapseThreads) {
-        return 
emailQueryViewDAO.listMailboxContentSortedBySentAt(PostgresMailboxId.class.cast(mailboxId),
 limit);
+        return 
emailQueryViewDAO.listMailboxContentSortedBySentAt(PostgresMailboxId.class.cast(mailboxId),
 limit, collapseThreads);
     }
 
     @Override
     public Flux<MessageId> listMailboxContentSortedByReceivedAt(MailboxId 
mailboxId, Limit limit, boolean collapseThreads) {
-        return 
emailQueryViewDAO.listMailboxContentSortedByReceivedAt(PostgresMailboxId.class.cast(mailboxId),
 limit);
+        return 
emailQueryViewDAO.listMailboxContentSortedByReceivedAt(PostgresMailboxId.class.cast(mailboxId),
 limit, collapseThreads);
     }
 
     @Override
     public Flux<MessageId> 
listMailboxContentSinceAfterSortedBySentAt(MailboxId mailboxId, ZonedDateTime 
since, Limit limit, boolean collapseThreads) {
-        return 
emailQueryViewDAO.listMailboxContentSinceAfterSortedBySentAt(PostgresMailboxId.class.cast(mailboxId),
 since, limit);
+        return 
emailQueryViewDAO.listMailboxContentSinceAfterSortedBySentAt(PostgresMailboxId.class.cast(mailboxId),
 since, limit, collapseThreads);
     }
 
     @Override
     public Flux<MessageId> 
listMailboxContentSinceAfterSortedByReceivedAt(MailboxId mailboxId, 
ZonedDateTime since, Limit limit, boolean collapseThreads) {
-        return 
emailQueryViewDAO.listMailboxContentSinceAfterSortedByReceivedAt(PostgresMailboxId.class.cast(mailboxId),
 since, limit);
+        return 
emailQueryViewDAO.listMailboxContentSinceAfterSortedByReceivedAt(PostgresMailboxId.class.cast(mailboxId),
 since, limit, collapseThreads);
     }
 
     @Override
     public Flux<MessageId> 
listMailboxContentBeforeSortedByReceivedAt(MailboxId mailboxId, ZonedDateTime 
since, Limit limit, boolean collapseThreads) {
-        return 
emailQueryViewDAO.listMailboxContentBeforeSortedByReceivedAt(PostgresMailboxId.class.cast(mailboxId),
 since, limit);
+        return 
emailQueryViewDAO.listMailboxContentBeforeSortedByReceivedAt(PostgresMailboxId.class.cast(mailboxId),
 since, limit, collapseThreads);
     }
 
     @Override
     public Flux<MessageId> listMailboxContentSinceSentAt(MailboxId mailboxId, 
ZonedDateTime since, Limit limit, boolean collapseThreads) {
-        return 
emailQueryViewDAO.listMailboxContentSinceSentAt(PostgresMailboxId.class.cast(mailboxId),
 since, limit);
+        return 
emailQueryViewDAO.listMailboxContentSinceSentAt(PostgresMailboxId.class.cast(mailboxId),
 since, limit, collapseThreads);
     }
 
     @Override
diff --git 
a/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/projections/PostgresEmailQueryViewDAO.java
 
b/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/projections/PostgresEmailQueryViewDAO.java
index 609662856c..a8c0296c12 100644
--- 
a/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/projections/PostgresEmailQueryViewDAO.java
+++ 
b/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/projections/PostgresEmailQueryViewDAO.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.jmap.postgres.projections;
 
+import static 
org.apache.james.jmap.api.projections.EmailQueryViewUtils.backendLimitFetch;
+import static 
org.apache.james.jmap.api.projections.EmailQueryViewUtils.messagesWithCollapseThreads;
 import static 
org.apache.james.jmap.postgres.projections.PostgresEmailQueryViewDataDefinition.PostgresEmailQueryViewTable.MAILBOX_ID;
 import static 
org.apache.james.jmap.postgres.projections.PostgresEmailQueryViewDataDefinition.PostgresEmailQueryViewTable.MESSAGE_ID;
 import static 
org.apache.james.jmap.postgres.projections.PostgresEmailQueryViewDataDefinition.PostgresEmailQueryViewTable.PK_CONSTRAINT_NAME;
@@ -27,17 +29,24 @@ import static 
org.apache.james.jmap.postgres.projections.PostgresEmailQueryViewD
 import static 
org.apache.james.jmap.postgres.projections.PostgresEmailQueryViewDataDefinition.PostgresEmailQueryViewTable.TABLE_NAME;
 import static 
org.apache.james.jmap.postgres.projections.PostgresEmailQueryViewDataDefinition.PostgresEmailQueryViewTable.THREAD_ID;
 
+import java.time.Instant;
+import java.time.OffsetDateTime;
 import java.time.ZonedDateTime;
+import java.util.UUID;
+import java.util.function.Function;
 
 import jakarta.inject.Inject;
 import jakarta.inject.Named;
 
 import org.apache.james.backends.postgres.utils.PostgresExecutor;
+import org.apache.james.jmap.api.projections.EmailQueryViewUtils.EmailEntry;
 import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.model.ThreadId;
 import org.apache.james.mailbox.postgres.PostgresMailboxId;
 import org.apache.james.mailbox.postgres.PostgresMessageId;
 import org.apache.james.util.streams.Limit;
+import org.jooq.Field;
+import org.jooq.Record;
 
 import com.google.common.base.Preconditions;
 
@@ -52,74 +61,169 @@ public class PostgresEmailQueryViewDAO {
         this.postgresExecutor = postgresExecutor;
     }
 
-    public Flux<MessageId> listMailboxContentSortedBySentAt(PostgresMailboxId 
mailboxId, Limit limit) {
+    public Flux<MessageId> listMailboxContentSortedBySentAt(PostgresMailboxId 
mailboxId, Limit limit, boolean collapseThreads) {
         Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be 
defined");
 
-        return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_ID)
+        Limit backendFetchLimit = backendLimitFetch(limit, collapseThreads);
+
+        return listMailboxContentSortedBySentAtWithBackendLimit(mailboxId, 
limit, collapseThreads, backendFetchLimit);
+    }
+
+    private Flux<MessageId> 
listMailboxContentSortedBySentAtWithBackendLimit(PostgresMailboxId mailboxId, 
Limit limit, boolean collapseThreads, Limit backendFetchLimit) {
+        Flux<EmailEntry> baseEntries = postgresExecutor.executeRows(dslContext 
-> Flux.from(dslContext.select(MESSAGE_ID, SENT_AT, THREAD_ID)
                 .from(TABLE_NAME)
                 .where(MAILBOX_ID.eq(mailboxId.asUuid()))
                 .orderBy(SENT_AT.desc())
-                .limit(limit.getLimit().get())))
-            .map(record -> 
PostgresMessageId.Factory.of(record.get(MESSAGE_ID)));
+                .limit(backendFetchLimit.getLimit().get())))
+            .map(asEmailEntry(SENT_AT));
+
+        if (collapseThreads) {
+            return messagesWithCollapseThreads(limit, backendFetchLimit, 
baseEntries,
+                newLimit -> 
listMailboxContentSortedBySentAtWithBackendLimit(mailboxId, limit, 
collapseThreads, newLimit));
+        }
+
+        return baseEntries.map(EmailEntry::getMessageId);
     }
 
-    public Flux<MessageId> 
listMailboxContentSortedByReceivedAt(PostgresMailboxId mailboxId, Limit limit) {
+    public Flux<MessageId> 
listMailboxContentSortedByReceivedAt(PostgresMailboxId mailboxId, Limit limit, 
boolean collapseThreads) {
         Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be 
defined");
 
-        return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_ID)
+        Limit backendFetchLimit = backendLimitFetch(limit, collapseThreads);
+
+        return listMailboxContentSortedByReceivedAtWithBackendLimit(mailboxId, 
limit, collapseThreads, backendFetchLimit);
+    }
+
+    private Flux<MessageId> 
listMailboxContentSortedByReceivedAtWithBackendLimit(PostgresMailboxId 
mailboxId, Limit limit, boolean collapseThreads, Limit backendFetchLimit) {
+        Flux<EmailEntry> baseEntries = postgresExecutor.executeRows(dslContext 
-> Flux.from(dslContext.select(MESSAGE_ID, RECEIVED_AT, THREAD_ID)
                 .from(TABLE_NAME)
                 .where(MAILBOX_ID.eq(mailboxId.asUuid()))
                 .orderBy(RECEIVED_AT.desc())
-                .limit(limit.getLimit().get())))
-            .map(record -> 
PostgresMessageId.Factory.of(record.get(MESSAGE_ID)));
+                .limit(backendFetchLimit.getLimit().get())))
+            .map(asEmailEntry(RECEIVED_AT));
+
+        if (collapseThreads) {
+            return messagesWithCollapseThreads(limit, backendFetchLimit, 
baseEntries,
+                newLimit -> 
listMailboxContentSortedByReceivedAtWithBackendLimit(mailboxId, limit, 
collapseThreads, newLimit));
+        }
+
+        return baseEntries.map(EmailEntry::getMessageId);
     }
 
-    public Flux<MessageId> 
listMailboxContentSinceAfterSortedBySentAt(PostgresMailboxId mailboxId, 
ZonedDateTime since, Limit limit) {
+    public Flux<MessageId> 
listMailboxContentSinceAfterSortedBySentAt(PostgresMailboxId mailboxId, 
ZonedDateTime since, Limit limit, boolean collapseThreads) {
         Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be 
defined");
 
-        return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_ID)
+        Limit backendFetchLimit = backendLimitFetch(limit, collapseThreads);
+
+        return 
listMailboxContentSinceAfterSortedBySentAtWithBackendLimit(mailboxId, since, 
limit, collapseThreads, backendFetchLimit);
+    }
+
+    private Flux<MessageId> 
listMailboxContentSinceAfterSortedBySentAtWithBackendLimit(PostgresMailboxId 
mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads, Limit 
backendFetchLimit) {
+        Flux<EmailEntry> baseEntries = postgresExecutor.executeRows(dslContext 
-> Flux.from(dslContext.select(MESSAGE_ID, SENT_AT, THREAD_ID)
                 .from(TABLE_NAME)
                 .where(MAILBOX_ID.eq(mailboxId.asUuid()))
                 .and(RECEIVED_AT.greaterOrEqual(since.toOffsetDateTime()))
                 .orderBy(SENT_AT.desc())
-                .limit(limit.getLimit().get())))
-            .map(record -> 
PostgresMessageId.Factory.of(record.get(MESSAGE_ID)));
+                .limit(backendFetchLimit.getLimit().get())))
+            .map(asEmailEntry(SENT_AT));
+
+        if (collapseThreads) {
+            return messagesWithCollapseThreads(limit, backendFetchLimit, 
baseEntries,
+                newLimit -> 
listMailboxContentSinceAfterSortedBySentAtWithBackendLimit(mailboxId, since, 
limit, collapseThreads, newLimit));
+        }
+
+        return baseEntries.map(EmailEntry::getMessageId);
     }
 
-    public Flux<MessageId> 
listMailboxContentSinceAfterSortedByReceivedAt(PostgresMailboxId mailboxId, 
ZonedDateTime since, Limit limit) {
+    public Flux<MessageId> 
listMailboxContentSinceAfterSortedByReceivedAt(PostgresMailboxId mailboxId, 
ZonedDateTime since, Limit limit, boolean collapseThreads) {
         Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be 
defined");
 
-        return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_ID)
+        Limit backendFetchLimit = backendLimitFetch(limit, collapseThreads);
+
+        return 
listMailboxContentSinceAfterSortedByReceivedAtWithBackendLimit(mailboxId, 
since, limit, collapseThreads, backendFetchLimit);
+    }
+
+    private Flux<MessageId> 
listMailboxContentSinceAfterSortedByReceivedAtWithBackendLimit(PostgresMailboxId
 mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads, Limit 
backendFetchLimit) {
+        Flux<EmailEntry> baseEntries = postgresExecutor.executeRows(dslContext 
-> Flux.from(dslContext.select(MESSAGE_ID, RECEIVED_AT, THREAD_ID)
                 .from(TABLE_NAME)
                 .where(MAILBOX_ID.eq(mailboxId.asUuid()))
                 .and(RECEIVED_AT.greaterOrEqual(since.toOffsetDateTime()))
                 .orderBy(RECEIVED_AT.desc())
-                .limit(limit.getLimit().get())))
-            .map(record -> 
PostgresMessageId.Factory.of(record.get(MESSAGE_ID)));
+                .limit(backendFetchLimit.getLimit().get())))
+            .map(asEmailEntry(RECEIVED_AT));
+
+        if (collapseThreads) {
+            return messagesWithCollapseThreads(limit, backendFetchLimit, 
baseEntries,
+                newLimit -> 
listMailboxContentSinceAfterSortedByReceivedAtWithBackendLimit(mailboxId, 
since, limit, collapseThreads, newLimit));
+        }
+
+        return baseEntries.map(EmailEntry::getMessageId);
     }
 
-    public Flux<MessageId> 
listMailboxContentBeforeSortedByReceivedAt(PostgresMailboxId mailboxId, 
ZonedDateTime since, Limit limit) {
+    public Flux<MessageId> 
listMailboxContentBeforeSortedByReceivedAt(PostgresMailboxId mailboxId, 
ZonedDateTime since, Limit limit, boolean collapseThreads) {
         Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be 
defined");
 
-        return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_ID)
+        Limit backendFetchLimit = backendLimitFetch(limit, collapseThreads);
+
+        return 
listMailboxContentBeforeSortedByReceivedAtWithBackendLimit(mailboxId, since, 
limit, collapseThreads, backendFetchLimit);
+    }
+
+    private Flux<MessageId> 
listMailboxContentBeforeSortedByReceivedAtWithBackendLimit(PostgresMailboxId 
mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads, Limit 
backendFetchLimit) {
+        Flux<EmailEntry> baseEntries = postgresExecutor.executeRows(dslContext 
-> Flux.from(dslContext.select(MESSAGE_ID, RECEIVED_AT, THREAD_ID)
                 .from(TABLE_NAME)
                 .where(MAILBOX_ID.eq(mailboxId.asUuid()))
                 .and(RECEIVED_AT.lessOrEqual(since.toOffsetDateTime()))
                 .orderBy(RECEIVED_AT.desc())
-                .limit(limit.getLimit().get())))
-            .map(record -> 
PostgresMessageId.Factory.of(record.get(MESSAGE_ID)));
+                .limit(backendFetchLimit.getLimit().get())))
+            .map(asEmailEntry(RECEIVED_AT));
+
+        if (collapseThreads) {
+            return messagesWithCollapseThreads(limit, backendFetchLimit, 
baseEntries,
+                newLimit -> 
listMailboxContentBeforeSortedByReceivedAtWithBackendLimit(mailboxId, since, 
limit, collapseThreads, newLimit));
+        }
+
+        return baseEntries.map(EmailEntry::getMessageId);
     }
 
-    public Flux<MessageId> listMailboxContentSinceSentAt(PostgresMailboxId 
mailboxId, ZonedDateTime since, Limit limit) {
+    public Flux<MessageId> listMailboxContentSinceSentAt(PostgresMailboxId 
mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads) {
         Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be 
defined");
 
-        return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_ID)
+        Limit backendFetchLimit = backendLimitFetch(limit, collapseThreads);
+
+        return listMailboxContentSinceSentAtWithBackendLimit(mailboxId, since, 
limit, collapseThreads, backendFetchLimit);
+    }
+
+    private Flux<MessageId> 
listMailboxContentSinceSentAtWithBackendLimit(PostgresMailboxId mailboxId, 
ZonedDateTime since, Limit limit, boolean collapseThreads, Limit 
backendFetchLimit) {
+        Flux<EmailEntry> baseEntries = postgresExecutor.executeRows(dslContext 
-> Flux.from(dslContext.select(MESSAGE_ID, SENT_AT, THREAD_ID)
                 .from(TABLE_NAME)
                 .where(MAILBOX_ID.eq(mailboxId.asUuid()))
                 .and(SENT_AT.greaterOrEqual(since.toOffsetDateTime()))
                 .orderBy(SENT_AT.desc())
-                .limit(limit.getLimit().get())))
-            .map(record -> 
PostgresMessageId.Factory.of(record.get(MESSAGE_ID)));
+                .limit(backendFetchLimit.getLimit().get())))
+            .map(asEmailEntry(SENT_AT));
+
+        if (collapseThreads) {
+            return messagesWithCollapseThreads(limit, backendFetchLimit, 
baseEntries,
+                newLimit -> 
listMailboxContentSinceSentAtWithBackendLimit(mailboxId, since, limit, 
collapseThreads, newLimit));
+        }
+
+        return baseEntries.map(EmailEntry::getMessageId);
+    }
+
+    private Function<Record, EmailEntry> asEmailEntry(Field<OffsetDateTime> 
dateField) {
+        return (Record record) -> {
+            PostgresMessageId messageId = 
PostgresMessageId.Factory.of(record.get(MESSAGE_ID));
+            ThreadId threadId = getThreadIdFromRecord(record, messageId);
+            Instant messageDate = record.get(dateField).toInstant();
+            return new EmailEntry(messageId, threadId, messageDate);
+        };
+    }
+
+    private ThreadId getThreadIdFromRecord(Record record, MessageId messageId) 
{
+        UUID threadIdUUID = record.get(THREAD_ID);
+        if (threadIdUUID == null) {
+            return ThreadId.fromBaseMessageId(messageId);
+        }
+        return 
ThreadId.fromBaseMessageId(PostgresMessageId.Factory.of(threadIdUUID));
     }
 
     public Mono<Void> delete(PostgresMailboxId mailboxId, PostgresMessageId 
messageId) {
diff --git 
a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/projections/EmailQueryViewUtils.java
 
b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/projections/EmailQueryViewUtils.java
new file mode 100644
index 0000000000..42f8ee454f
--- /dev/null
+++ 
b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/projections/EmailQueryViewUtils.java
@@ -0,0 +1,113 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.api.projections;
+
+import java.time.Instant;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+
+import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.mailbox.model.ThreadId;
+import org.apache.james.util.streams.Limit;
+
+import com.google.common.collect.ImmutableList;
+
+import reactor.core.publisher.Flux;
+
+public class EmailQueryViewUtils {
+    private static final int COLLAPSE_THREADS_LIMIT_MULTIPLIER = 4;
+
+    public static class EmailEntry {
+        private final MessageId messageId;
+        private final ThreadId threadId;
+        private final Instant messageDate;
+
+        public EmailEntry(MessageId messageId, ThreadId threadId, Instant 
messageDate) {
+            this.messageId = messageId;
+            this.threadId = threadId;
+            this.messageDate = messageDate;
+        }
+
+        public MessageId getMessageId() {
+            return messageId;
+        }
+
+        public ThreadId getThreadId() {
+            return threadId;
+        }
+
+        public Instant getMessageDate() {
+            return messageDate;
+        }
+
+        @Override
+        public final boolean equals(Object o) {
+            if (o instanceof EmailEntry) {
+                EmailEntry entry = (EmailEntry) o;
+
+                return Objects.equals(this.messageId, entry.messageId)
+                    && Objects.equals(this.threadId, entry.threadId)
+                    && Objects.equals(this.messageDate, entry.messageDate);
+            }
+            return false;
+        }
+
+        @Override
+        public final int hashCode() {
+            return Objects.hash(messageId, threadId, messageDate);
+        }
+    }
+
+    public static Limit backendLimitFetch(Limit limit, boolean 
collapseThreads) {
+        if (collapseThreads) {
+            return Limit.limit(limit.getLimit().get() * 
COLLAPSE_THREADS_LIMIT_MULTIPLIER);
+        }
+        return limit;
+    }
+
+    public static Flux<MessageId> messagesWithCollapseThreads(Limit limit, 
Limit backendFetchLimit, Flux<EmailEntry> baseEntries, Function<Limit, 
Flux<MessageId>> listMessagesCallbackFunction) {
+        return baseEntries.collectList()
+            .flatMapMany(results -> {
+                List<EmailEntry> distinctByThreadId = 
distinctByThreadId(results);
+                boolean hasEnoughResults = distinctByThreadId.size() >= 
limit.getLimit().get();
+                boolean isExhaustive = results.size() < 
backendFetchLimit.getLimit().get();
+                if (hasEnoughResults || isExhaustive) {
+                    return Flux.fromIterable(distinctByThreadId)
+                        .take(limit.getLimit().get())
+                        .map(EmailEntry::getMessageId);
+                }
+                Limit newBackendFetchLimit = 
Limit.from(backendFetchLimit.getLimit().get() * 
COLLAPSE_THREADS_LIMIT_MULTIPLIER);
+                return 
listMessagesCallbackFunction.apply(newBackendFetchLimit);
+            });
+    }
+
+    private static List<EmailEntry> distinctByThreadId(List<EmailEntry> 
emailEntries) {
+        ImmutableList.Builder<EmailEntry> list = ImmutableList.builder();
+        HashSet<ThreadId> threadIdHashSet = new HashSet<>();
+        emailEntries.forEach(emailEntry -> {
+            if (threadIdHashSet.add(emailEntry.getThreadId())) {
+                list.add(emailEntry);
+            }
+        });
+        return list.build();
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to