chibenwa commented on code in PR #2792:
URL: https://github.com/apache/james-project/pull/2792#discussion_r2577394467


##########
server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraEmailQueryView.java:
##########
@@ -174,84 +185,102 @@ public CassandraEmailQueryView(CqlSession session) {
     }
 
     @Override
-    public Flux<MessageId> listMailboxContentSortedBySentAt(MailboxId 
mailboxId, Limit limit) {
-        Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be 
defined");
-
+    public Flux<MessageId> listMailboxContentSortedBySentAt(MailboxId 
mailboxId, Limit limit, boolean collapseThreads) {
         CassandraId cassandraId = (CassandraId) mailboxId;
-        return executor.executeRows(listMailboxContentBySentAt.bind()
-                .set(MAILBOX_ID, cassandraId.asUuid(), TypeCodecs.UUID)
-                .setInt(LIMIT_MARKER, limit.getLimit().get()))
-            .map(row -> CassandraMessageId.Factory.of(row.get(0, 
TypeCodecs.UUID)));
+
+        return EmailQueryViewUtils.QueryViewExtender.of(limit, collapseThreads)
+            .resolve(backendFetchLimit -> 
executor.executeRows(listMailboxContentBySentAt.bind()
+                    .set(MAILBOX_ID, cassandraId.asUuid(), TypeCodecs.UUID)
+                    .setInt(LIMIT_MARKER, backendFetchLimit.getLimit().get()))
+                .map(asEmailEntry(SENT_AT)));
     }
 
     @Override
-    public Flux<MessageId> listMailboxContentSortedByReceivedAt(MailboxId 
mailboxId, Limit limit) {
-        Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be 
defined");
-
+    public Flux<MessageId> listMailboxContentSortedByReceivedAt(MailboxId 
mailboxId, Limit limit, boolean collapseThreads) {
         CassandraId cassandraId = (CassandraId) mailboxId;
-        return executor.executeRows(listMailboxContentByReceivedAt.bind()
-            .set(MAILBOX_ID, cassandraId.asUuid(), TypeCodecs.UUID)
-            .setInt(LIMIT_MARKER, limit.getLimit().get()))
-            .map(row -> CassandraMessageId.Factory.of(row.get(0, 
TypeCodecs.UUID)));
+
+        return EmailQueryViewUtils.QueryViewExtender.of(limit, collapseThreads)
+            .resolve(backendFetchLimit -> 
executor.executeRows(listMailboxContentByReceivedAt.bind()
+                    .set(MAILBOX_ID, cassandraId.asUuid(), TypeCodecs.UUID)
+                    .setInt(LIMIT_MARKER, backendFetchLimit.getLimit().get()))
+                .map(asEmailEntry(RECEIVED_AT)));
     }
 
     @Override
-    public Flux<MessageId> 
listMailboxContentSinceAfterSortedBySentAt(MailboxId mailboxId, ZonedDateTime 
since, Limit limit) {
+    public Flux<MessageId> 
listMailboxContentSinceAfterSortedBySentAt(MailboxId mailboxId, ZonedDateTime 
since, Limit limit, boolean collapseThreads) {
         Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be 
defined");
 
         CassandraId cassandraId = (CassandraId) mailboxId;
 
-        return executor.executeRows(listMailboxContentSinceReceivedAt.bind()
+        Flux<EmailEntry> baseEntries = 
executor.executeRows(listMailboxContentSinceReceivedAt.bind()
                 .set(MAILBOX_ID, cassandraId.asUuid(), TypeCodecs.UUID)
                 .setInstant(RECEIVED_AT, since.toInstant()))
-            .map(row -> {
-                CassandraMessageId messageId = 
CassandraMessageId.Factory.of(row.getUuid(MESSAGE_ID));
-                Instant sentAt = row.getInstant(SENT_AT);
-
-                return Pair.of(messageId, sentAt);
-            })
-            .sort(Comparator.<Pair<CassandraMessageId, Instant>, 
Instant>comparing(Pair::getValue).reversed())
-            .map(pair -> (MessageId) pair.getKey())
+            .map(asEmailEntry(SENT_AT));
+
+        if (collapseThreads) {
+            return baseEntries.groupBy(EmailEntry::getThreadId)
+                .flatMap(group -> group.reduce((e1, e2) ->
+                    e1.getMessageDate().isAfter(e2.getMessageDate()) ? e1 : 
e2))
+                
.sort(Comparator.comparing(EmailEntry::getMessageDate).reversed())
+                .map(EmailEntry::getMessageId)
+                .take(limit.getLimit().get());
+        }
+        return 
baseEntries.sort(Comparator.comparing(EmailEntry::getMessageDate).reversed())
+            .map(EmailEntry::getMessageId)
             .take(limit.getLimit().get());
     }
 
     @Override
-    public Flux<MessageId> 
listMailboxContentSinceAfterSortedByReceivedAt(MailboxId mailboxId, 
ZonedDateTime since, Limit limit) {
-        Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be 
defined");
-
+    public Flux<MessageId> 
listMailboxContentSinceAfterSortedByReceivedAt(MailboxId mailboxId, 
ZonedDateTime since, Limit limit, boolean collapseThreads) {
         CassandraId cassandraId = (CassandraId) mailboxId;
 
-        return executor.executeRows(listMailboxContentSinceReceivedAt.bind()
-            .set(MAILBOX_ID, cassandraId.asUuid(), TypeCodecs.UUID)
-            .setInstant(RECEIVED_AT, since.toInstant()))
-            .<MessageId>map(row -> CassandraMessageId.Factory.of(row.get(0, 
TypeCodecs.UUID)))
-            .take(limit.getLimit().get());
+        return EmailQueryViewUtils.QueryViewExtender.of(limit, collapseThreads)
+            .resolve(backendFetchLimit -> 
executor.executeRows(listMailboxContentSinceReceivedAt.bind()
+                    .set(MAILBOX_ID, cassandraId.asUuid(), TypeCodecs.UUID)
+                    .setInstant(RECEIVED_AT, since.toInstant())
+                    .setInt(LIMIT_MARKER, backendFetchLimit.getLimit().get()))
+                .map(asEmailEntry(SENT_AT)));
     }
 
     @Override
-    public Flux<MessageId> 
listMailboxContentBeforeSortedByReceivedAt(MailboxId mailboxId, ZonedDateTime 
since, Limit limit) {
-        Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be 
defined");
-
+    public Flux<MessageId> 
listMailboxContentBeforeSortedByReceivedAt(MailboxId mailboxId, ZonedDateTime 
since, Limit limit, boolean collapseThreads) {
         CassandraId cassandraId = (CassandraId) mailboxId;
 
-        return executor.executeRows(listMailboxContentBeforeReceivedAt.bind()
-            .set(MAILBOX_ID, cassandraId.asUuid(), TypeCodecs.UUID)
-            .setInstant(RECEIVED_AT, since.toInstant()))
-            .<MessageId>map(row -> CassandraMessageId.Factory.of(row.get(0, 
TypeCodecs.UUID)))
-            .take(limit.getLimit().get());
+        return EmailQueryViewUtils.QueryViewExtender.of(limit, collapseThreads)
+            .resolve(backendFetchLimit -> 
executor.executeRows(listMailboxContentBeforeReceivedAt.bind()
+                        .set(MAILBOX_ID, cassandraId.asUuid(), TypeCodecs.UUID)
+                        .setInstant(RECEIVED_AT, since.toInstant())
+                        .setInt(LIMIT_MARKER, 
backendFetchLimit.getLimit().get()))
+                    .map(asEmailEntry(SENT_AT)));
     }
 
     @Override
-    public Flux<MessageId> listMailboxContentSinceSentAt(MailboxId mailboxId, 
ZonedDateTime since, Limit limit) {
-        Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be 
defined");
-
+    public Flux<MessageId> listMailboxContentSinceSentAt(MailboxId mailboxId, 
ZonedDateTime since, Limit limit, boolean collapseThreads) {
         CassandraId cassandraId = (CassandraId) mailboxId;
 
-        return executor.executeRows(listMailboxContentSinceSentAt.bind()
-            .set(MAILBOX_ID, cassandraId.asUuid(), TypeCodecs.UUID)
-            .setInt(LIMIT_MARKER, limit.getLimit().get())
-            .setInstant(SENT_AT, since.toInstant()))
-            .map(row -> CassandraMessageId.Factory.of(row.get(0, 
TypeCodecs.UUID)));
+        return EmailQueryViewUtils.QueryViewExtender.of(limit, collapseThreads)
+            .resolve(backendFetchLimit -> 
executor.executeRows(listMailboxContentSinceSentAt.bind()
+                .set(MAILBOX_ID, cassandraId.asUuid(), TypeCodecs.UUID)
+                .setInstant(SENT_AT, since.toInstant())
+                .setInt(LIMIT_MARKER, backendFetchLimit.getLimit().get()))
+            .map(asEmailEntry(SENT_AT)));
+    }
+
+    private Function<Row, EmailEntry> asEmailEntry(CqlIdentifier dateField) {
+        return (Row row) -> {
+            CassandraMessageId messageId = 
CassandraMessageId.Factory.of(row.getUuid(MESSAGE_ID));
+            ThreadId threadId = getThreadIdFromRow(row, messageId);
+            Instant messageDate = row.getInstant(dateField);
+            return new EmailEntry(messageId, threadId, messageDate);
+        };
+    }
+
+    private ThreadId getThreadIdFromRow(Row row, MessageId messageId) {
+        UUID threadIdUUID = row.get(CassandraEmailQueryViewTable.THREAD_ID, 
TypeCodecs.TIMEUUID);
+        if (threadIdUUID == null) {
+            return ThreadId.fromBaseMessageId(messageId);
+        }
+        return 
ThreadId.fromBaseMessageId(CassandraMessageId.Factory.of(threadIdUUID));

Review Comment:
   Can we use Optional ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to