This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 4333ce0cea55067145b4df6845a0fe9df8f5970e
Author: Rene Cordier <[email protected]>
AuthorDate: Fri Aug 29 16:37:54 2025 +0700

    JAMES-3340 Memory implementation for collapseThreads handling with 
EmailQueryView
---
 .../memory/projections/MemoryEmailQueryView.java   |  41 +++++--
 .../projections/MemoryEmailQueryViewTest.java      | 130 +++++++++++++++++++++
 2 files changed, 161 insertions(+), 10 deletions(-)

diff --git 
a/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/projections/MemoryEmailQueryView.java
 
b/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/projections/MemoryEmailQueryView.java
index 2504f027fe..9097f3c8a2 100644
--- 
a/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/projections/MemoryEmailQueryView.java
+++ 
b/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/projections/MemoryEmailQueryView.java
@@ -50,7 +50,9 @@ public class MemoryEmailQueryView implements EmailQueryView {
     public Flux<MessageId> listMailboxContentSortedBySentAt(MailboxId 
mailboxId, Limit limit, boolean collapseThreads) {
         Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be 
defined");
 
-        return Flux.fromIterable(entries.row(mailboxId).values())
+        Flux<Entry> baseEntries = 
Flux.fromIterable(entries.row(mailboxId).values());
+
+        return maybeCollapseThreads(baseEntries, collapseThreads)
             .sort(Comparator.comparing(Entry::getSentAt).reversed())
             .map(Entry::getMessageId)
             .take(limit.getLimit().get());
@@ -60,8 +62,10 @@ public class MemoryEmailQueryView implements EmailQueryView {
     public Flux<MessageId> listMailboxContentSinceSentAt(MailboxId mailboxId, 
ZonedDateTime since, Limit limit, boolean collapseThreads) {
         Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be 
defined");
 
-        return Flux.fromIterable(entries.row(mailboxId).values())
-            .filter(e -> e.getSentAt().isAfter(since) || 
e.getSentAt().isEqual(since))
+        Flux<Entry> baseEntries = 
Flux.fromIterable(entries.row(mailboxId).values())
+            .filter(e -> e.getSentAt().isAfter(since) || 
e.getSentAt().isEqual(since));
+
+        return maybeCollapseThreads(baseEntries, collapseThreads)
             .sort(Comparator.comparing(Entry::getSentAt).reversed())
             .map(Entry::getMessageId)
             .take(limit.getLimit().get());
@@ -71,8 +75,10 @@ public class MemoryEmailQueryView implements EmailQueryView {
     public Flux<MessageId> 
listMailboxContentSinceAfterSortedBySentAt(MailboxId mailboxId, ZonedDateTime 
since, Limit limit, boolean collapseThreads) {
         Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be 
defined");
 
-        return Flux.fromIterable(entries.row(mailboxId).values())
-            .filter(e -> e.getReceivedAt().isAfter(since) || 
e.getReceivedAt().isEqual(since))
+        Flux<Entry> baseEntries = 
Flux.fromIterable(entries.row(mailboxId).values())
+            .filter(e -> e.getReceivedAt().isAfter(since) || 
e.getReceivedAt().isEqual(since));
+
+        return maybeCollapseThreads(baseEntries, collapseThreads)
             .sort(Comparator.comparing(Entry::getSentAt).reversed())
             .map(Entry::getMessageId)
             .take(limit.getLimit().get());
@@ -80,7 +86,9 @@ public class MemoryEmailQueryView implements EmailQueryView {
 
     @Override
     public Flux<MessageId> listMailboxContentSortedByReceivedAt(MailboxId 
mailboxId, Limit limit, boolean collapseThreads) {
-        return Flux.fromIterable(entries.row(mailboxId).values())
+        Flux<Entry> baseEntries = 
Flux.fromIterable(entries.row(mailboxId).values());
+
+        return maybeCollapseThreads(baseEntries, collapseThreads)
             .sort(Comparator.comparing(Entry::getReceivedAt).reversed())
             .map(Entry::getMessageId)
             .take(limit.getLimit().get());
@@ -88,8 +96,10 @@ public class MemoryEmailQueryView implements EmailQueryView {
 
     @Override
     public Flux<MessageId> 
listMailboxContentSinceAfterSortedByReceivedAt(MailboxId mailboxId, 
ZonedDateTime since, Limit limit, boolean collapseThreads) {
-        return Flux.fromIterable(entries.row(mailboxId).values())
-            .filter(e -> e.getReceivedAt().isAfter(since) || 
e.getReceivedAt().isEqual(since))
+        Flux<Entry> baseEntries = 
Flux.fromIterable(entries.row(mailboxId).values())
+            .filter(e -> e.getReceivedAt().isAfter(since) || 
e.getReceivedAt().isEqual(since));
+
+        return maybeCollapseThreads(baseEntries, collapseThreads)
             .sort(Comparator.comparing(Entry::getReceivedAt).reversed())
             .map(Entry::getMessageId)
             .take(limit.getLimit().get());
@@ -97,13 +107,24 @@ public class MemoryEmailQueryView implements 
EmailQueryView {
 
     @Override
     public Flux<MessageId> 
listMailboxContentBeforeSortedByReceivedAt(MailboxId mailboxId, ZonedDateTime 
before, Limit limit, boolean collapseThreads) {
-        return Flux.fromIterable(entries.row(mailboxId).values())
-            .filter(e -> e.getReceivedAt().isBefore(before) || 
e.getReceivedAt().isEqual(before))
+        Flux<Entry> baseEntries = 
Flux.fromIterable(entries.row(mailboxId).values())
+            .filter(e -> e.getReceivedAt().isBefore(before) || 
e.getReceivedAt().isEqual(before));
+
+        return maybeCollapseThreads(baseEntries, collapseThreads)
             .sort(Comparator.comparing(Entry::getReceivedAt).reversed())
             .map(Entry::getMessageId)
             .take(limit.getLimit().get());
     }
 
+    private Flux<Entry> maybeCollapseThreads(Flux<Entry> entries, boolean 
collapseThreads) {
+        if (collapseThreads) {
+            return entries.groupBy(Entry::getThreadId)
+                .flatMap(group -> group.reduce((e1, e2) ->
+                    e1.getReceivedAt().isAfter(e2.getReceivedAt()) ? e1 : e2));
+        }
+        return entries;
+    }
+
     @Override
     public Mono<Void> delete(MailboxId mailboxId, MessageId messageId) {
         return Mono.fromRunnable(() -> entries.remove(mailboxId, messageId));
diff --git 
a/server/data/data-jmap/src/test/java/org/apache/james/jmap/memory/projections/MemoryEmailQueryViewTest.java
 
b/server/data/data-jmap/src/test/java/org/apache/james/jmap/memory/projections/MemoryEmailQueryViewTest.java
index 8c53060187..dfd48ce7ab 100644
--- 
a/server/data/data-jmap/src/test/java/org/apache/james/jmap/memory/projections/MemoryEmailQueryViewTest.java
+++ 
b/server/data/data-jmap/src/test/java/org/apache/james/jmap/memory/projections/MemoryEmailQueryViewTest.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.jmap.memory.projections;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 import org.apache.james.jmap.api.projections.EmailQueryView;
 import org.apache.james.jmap.api.projections.EmailQueryViewContract;
 import org.apache.james.mailbox.model.MailboxId;
@@ -26,9 +28,13 @@ import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.model.TestId;
 import org.apache.james.mailbox.model.TestMessageId;
 import org.apache.james.mailbox.model.ThreadId;
+import org.apache.james.util.streams.Limit;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 public class MemoryEmailQueryViewTest implements EmailQueryViewContract {
+    private static final boolean COLLAPSE_THREAD = true;
+
     private MemoryEmailQueryView testee;
 
     @BeforeEach
@@ -71,5 +77,129 @@ public class MemoryEmailQueryViewTest implements 
EmailQueryViewContract {
         return ThreadId.fromBaseMessageId(TestMessageId.of(1));
     }
 
+    @Test
+    public void 
listMailboxContentSortedBySentAtShouldReturnLatestMessageOfThreadWhenCollapseThreads()
 {
+        testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), 
threadId()).block();
+        testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), 
threadId()).block();
+        testee().save(mailboxId1(), DATE_2, DATE_6, messageId3(), 
threadId()).block();
+
+        assertThat(testee().listMailboxContentSortedBySentAt(mailboxId1(), 
Limit.limit(12), COLLAPSE_THREAD).collectList().block())
+            .containsExactly(messageId3());
+    }
+
+    @Test
+    public void 
listMailboxContentSortedBySentAtShouldApplyLimitWithCollapseThreads() {
+        testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), 
ThreadId.fromBaseMessageId(TestMessageId.of(2))).block();
+        testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), 
threadId()).block();
+        testee().save(mailboxId1(), DATE_2, DATE_6, messageId3(), 
ThreadId.fromBaseMessageId(TestMessageId.of(3))).block();
+        testee().save(mailboxId1(), DATE_4, DATE_5, messageId4(), 
threadId()).block();
+
+        assertThat(testee().listMailboxContentSortedBySentAt(mailboxId1(), 
Limit.limit(2), COLLAPSE_THREAD).collectList().block())
+            .containsExactly(messageId4(), messageId3());
+    }
+
+    @Test
+    public void 
listMailboxContentSinceReceivedAtShouldReturnLatestMessageOfThreadWhenCollapseThreads()
 {
+        testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), 
threadId()).block();
+        testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), 
threadId()).block();
+        testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), 
threadId()).block();
+
+        
assertThat(testee().listMailboxContentSinceAfterSortedBySentAt(mailboxId1(), 
DATE_3, Limit.limit(12), COLLAPSE_THREAD).collectList().block())
+            .containsExactly(messageId3());
+    }
+
+    @Test
+    public void 
listMailboxContentSinceReceivedAtShouldApplyLimitWithCollapseThreads() {
+        testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), 
ThreadId.fromBaseMessageId(TestMessageId.of(2))).block();
+        testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), 
ThreadId.fromBaseMessageId(TestMessageId.of(3))).block();
+        testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), 
threadId()).block();
+        testee().save(mailboxId1(), DATE_4, DATE_5, messageId4(), 
threadId()).block();
+
+        
assertThat(testee().listMailboxContentSinceAfterSortedBySentAt(mailboxId1(), 
DATE_1, Limit.limit(2), COLLAPSE_THREAD).collectList().block())
+            .containsExactly(messageId3(), messageId2());
+    }
+
+    @Test
+    public void 
listMailboxContentSinceSentdAtShouldReturnLatestMessageOfThreadWhenCollapseThreads()
 {
+        testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), 
threadId()).block();
+        testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), 
threadId()).block();
+        testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), 
threadId()).block();
+
+        assertThat(testee().listMailboxContentSinceSentAt(mailboxId1(), 
DATE_2, Limit.limit(12), COLLAPSE_THREAD).collectList().block())
+            .containsExactly(messageId3());
+    }
+
+    @Test
+    public void 
listMailboxContentSinceSentAtShouldApplyLimitWithCollapseThreads() {
+        testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), 
ThreadId.fromBaseMessageId(TestMessageId.of(2))).block();
+        testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), 
ThreadId.fromBaseMessageId(TestMessageId.of(3))).block();
+        testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), 
threadId()).block();
+        testee().save(mailboxId1(), DATE_4, DATE_5, messageId4(), 
threadId()).block();
+
+        assertThat(testee().listMailboxContentSinceSentAt(mailboxId1(), 
DATE_1, Limit.limit(2), COLLAPSE_THREAD).collectList().block())
+            .containsExactly(messageId3(), messageId2());
+    }
+
+    @Test
+    public void 
listMailboxContentSortedByReceivedAtShouldReturnLatestMessageOfThreadWhenCollapseThreads()
 {
+        testee().save(mailboxId1(), DATE_1, DATE_4, messageId1(), 
threadId()).block();
+        testee().save(mailboxId1(), DATE_2, DATE_3, messageId2(), 
threadId()).block();
+        testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), 
threadId()).block();
+
+        assertThat(testee().listMailboxContentSortedByReceivedAt(mailboxId1(), 
Limit.limit(12), COLLAPSE_THREAD).collectList().block())
+            .containsExactly(messageId3());
+    }
+
+    @Test
+    public void 
listMailboxContentSortedByReceivedAtShouldApplyLimitWithCollapseThreads() {
+        testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), 
ThreadId.fromBaseMessageId(TestMessageId.of(2))).block();
+        testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), 
ThreadId.fromBaseMessageId(TestMessageId.of(3))).block();
+        testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), 
threadId()).block();
+        testee().save(mailboxId1(), DATE_4, DATE_5, messageId4(), 
threadId()).block();
+
+        assertThat(testee().listMailboxContentSortedByReceivedAt(mailboxId1(), 
Limit.limit(2), COLLAPSE_THREAD).collectList().block())
+            .containsExactly(messageId3(), messageId2());
+    }
+
+    @Test
+    public void 
listMailboxContentSinceSortedByReceivedAtShouldReturnLatestMessageOfThreadWhenCollapseThreads()
 {
+        testee().save(mailboxId1(), DATE_1, DATE_4, messageId1(), 
threadId()).block();
+        testee().save(mailboxId1(), DATE_2, DATE_3, messageId2(), 
threadId()).block();
+        testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), 
threadId()).block();
+
+        
assertThat(testee().listMailboxContentSinceAfterSortedByReceivedAt(mailboxId1(),
 DATE_4, Limit.limit(12), COLLAPSE_THREAD).collectList().block())
+            .containsExactly(messageId3());
+    }
+
+    @Test
+    public void 
listMailboxContentSinceSortedByReceivedAtShouldApplyLimitWithCollapseThreads() {
+        testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), 
ThreadId.fromBaseMessageId(TestMessageId.of(2))).block();
+        testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), 
ThreadId.fromBaseMessageId(TestMessageId.of(3))).block();
+        testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), 
threadId()).block();
+        testee().save(mailboxId1(), DATE_4, DATE_5, messageId4(), 
threadId()).block();
+
+        
assertThat(testee().listMailboxContentSinceAfterSortedByReceivedAt(mailboxId1(),
 DATE_4, Limit.limit(2), COLLAPSE_THREAD).collectList().block())
+            .containsExactly(messageId3(), messageId2());
+    }
+
+    @Test
+    public void 
listMailboxContentBeforeSortedByReceivedAtShouldReturnLatestMessageOfThreadWhenCollapseThreads()
 {
+        testee().save(mailboxId1(), DATE_1, DATE_4, messageId1(), 
threadId()).block();
+        testee().save(mailboxId1(), DATE_2, DATE_3, messageId2(), 
threadId()).block();
+        testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), 
threadId()).block();
 
+        
assertThat(testee().listMailboxContentBeforeSortedByReceivedAt(mailboxId1(), 
DATE_4, Limit.limit(12), COLLAPSE_THREAD).collectList().block())
+            .containsExactly(messageId1());
+    }
+
+    @Test
+    public void 
listMailboxContentBeforeSortedByReceivedAtShouldApplyLimitWithCollapseThreads() 
{
+        testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), 
ThreadId.fromBaseMessageId(TestMessageId.of(2))).block();
+        testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), 
threadId()).block();
+        testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), 
ThreadId.fromBaseMessageId(TestMessageId.of(3))).block();
+        testee().save(mailboxId1(), DATE_4, DATE_5, messageId4(), 
threadId()).block();
+
+        
assertThat(testee().listMailboxContentBeforeSortedByReceivedAt(mailboxId1(), 
DATE_5, Limit.limit(2), COLLAPSE_THREAD).collectList().block())
+            .containsExactly(messageId4(), messageId1());
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to