chibenwa commented on code in PR #2792:
URL: https://github.com/apache/james-project/pull/2792#discussion_r2577396186


##########
server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/projections/PostgresEmailQueryViewDAO.java:
##########
@@ -25,99 +25,118 @@
 import static 
org.apache.james.jmap.postgres.projections.PostgresEmailQueryViewDataDefinition.PostgresEmailQueryViewTable.RECEIVED_AT;
 import static 
org.apache.james.jmap.postgres.projections.PostgresEmailQueryViewDataDefinition.PostgresEmailQueryViewTable.SENT_AT;
 import static 
org.apache.james.jmap.postgres.projections.PostgresEmailQueryViewDataDefinition.PostgresEmailQueryViewTable.TABLE_NAME;
+import static 
org.apache.james.jmap.postgres.projections.PostgresEmailQueryViewDataDefinition.PostgresEmailQueryViewTable.THREAD_ID;
 
+import java.time.Instant;
+import java.time.OffsetDateTime;
 import java.time.ZonedDateTime;
+import java.util.UUID;
+import java.util.function.Function;
 
 import jakarta.inject.Inject;
 import jakarta.inject.Named;
 
 import org.apache.james.backends.postgres.utils.PostgresExecutor;
+import org.apache.james.jmap.api.projections.EmailQueryViewUtils;
+import org.apache.james.jmap.api.projections.EmailQueryViewUtils.EmailEntry;
 import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.mailbox.model.ThreadId;
 import org.apache.james.mailbox.postgres.PostgresMailboxId;
 import org.apache.james.mailbox.postgres.PostgresMessageId;
 import org.apache.james.util.streams.Limit;
-
-import com.google.common.base.Preconditions;
+import org.jooq.Field;
+import org.jooq.Record;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class PostgresEmailQueryViewDAO {
-    private PostgresExecutor postgresExecutor;
+    private final PostgresExecutor postgresExecutor;
 
     @Inject
     public 
PostgresEmailQueryViewDAO(@Named(PostgresExecutor.BY_PASS_RLS_INJECT) 
PostgresExecutor postgresExecutor) {
         this.postgresExecutor = postgresExecutor;
     }
 
-    public Flux<MessageId> listMailboxContentSortedBySentAt(PostgresMailboxId 
mailboxId, Limit limit) {
-        Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be 
defined");
-
-        return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_ID)
-                .from(TABLE_NAME)
-                .where(MAILBOX_ID.eq(mailboxId.asUuid()))
-                .orderBy(SENT_AT.desc())
-                .limit(limit.getLimit().get())))
-            .map(record -> 
PostgresMessageId.Factory.of(record.get(MESSAGE_ID)));
+    public Flux<MessageId> listMailboxContentSortedBySentAt(PostgresMailboxId 
mailboxId, Limit limit, boolean collapseThreads) {
+        return EmailQueryViewUtils.QueryViewExtender.of(limit, collapseThreads)
+            .resolve(backendFetchLimit -> 
postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_ID, SENT_AT, THREAD_ID)
+                    .from(TABLE_NAME)
+                    .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+                    .orderBy(SENT_AT.desc())
+                    .limit(backendFetchLimit.getLimit().get())))
+                .map(asEmailEntry(SENT_AT)));
     }
 
-    public Flux<MessageId> 
listMailboxContentSortedByReceivedAt(PostgresMailboxId mailboxId, Limit limit) {
-        Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be 
defined");
-
-        return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_ID)
-                .from(TABLE_NAME)
-                .where(MAILBOX_ID.eq(mailboxId.asUuid()))
-                .orderBy(RECEIVED_AT.desc())
-                .limit(limit.getLimit().get())))
-            .map(record -> 
PostgresMessageId.Factory.of(record.get(MESSAGE_ID)));
+    public Flux<MessageId> 
listMailboxContentSortedByReceivedAt(PostgresMailboxId mailboxId, Limit limit, 
boolean collapseThreads) {
+        return EmailQueryViewUtils.QueryViewExtender.of(limit, collapseThreads)
+            .resolve(backendFetchLimit -> 
postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_ID, RECEIVED_AT, THREAD_ID)
+                    .from(TABLE_NAME)
+                    .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+                    .orderBy(RECEIVED_AT.desc())
+                    .limit(backendFetchLimit.getLimit().get())))
+                .map(asEmailEntry(RECEIVED_AT)));
     }
 
-    public Flux<MessageId> 
listMailboxContentSinceAfterSortedBySentAt(PostgresMailboxId mailboxId, 
ZonedDateTime since, Limit limit) {
-        Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be 
defined");
-
-        return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_ID)
-                .from(TABLE_NAME)
-                .where(MAILBOX_ID.eq(mailboxId.asUuid()))
-                .and(RECEIVED_AT.greaterOrEqual(since.toOffsetDateTime()))
-                .orderBy(SENT_AT.desc())
-                .limit(limit.getLimit().get())))
-            .map(record -> 
PostgresMessageId.Factory.of(record.get(MESSAGE_ID)));
+    public Flux<MessageId> 
listMailboxContentSinceAfterSortedBySentAt(PostgresMailboxId mailboxId, 
ZonedDateTime since, Limit limit, boolean collapseThreads) {
+        return EmailQueryViewUtils.QueryViewExtender.of(limit, collapseThreads)
+            .resolve(backendFetchLimit -> 
postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_ID, SENT_AT, THREAD_ID)
+                    .from(TABLE_NAME)
+                    .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+                    .and(RECEIVED_AT.greaterOrEqual(since.toOffsetDateTime()))
+                    .orderBy(SENT_AT.desc())
+                    .limit(backendFetchLimit.getLimit().get())))
+                .map(asEmailEntry(SENT_AT)));
     }
 
-    public Flux<MessageId> 
listMailboxContentSinceAfterSortedByReceivedAt(PostgresMailboxId mailboxId, 
ZonedDateTime since, Limit limit) {
-        Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be 
defined");
-
-        return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_ID)
-                .from(TABLE_NAME)
-                .where(MAILBOX_ID.eq(mailboxId.asUuid()))
-                .and(RECEIVED_AT.greaterOrEqual(since.toOffsetDateTime()))
-                .orderBy(RECEIVED_AT.desc())
-                .limit(limit.getLimit().get())))
-            .map(record -> 
PostgresMessageId.Factory.of(record.get(MESSAGE_ID)));
+    public Flux<MessageId> 
listMailboxContentSinceAfterSortedByReceivedAt(PostgresMailboxId mailboxId, 
ZonedDateTime since, Limit limit, boolean collapseThreads) {
+        return EmailQueryViewUtils.QueryViewExtender.of(limit, collapseThreads)
+            .resolve(backendFetchLimit -> 
postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_ID, RECEIVED_AT, THREAD_ID)
+                    .from(TABLE_NAME)
+                    .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+                    .and(RECEIVED_AT.greaterOrEqual(since.toOffsetDateTime()))
+                    .orderBy(RECEIVED_AT.desc())
+                    .limit(backendFetchLimit.getLimit().get())))
+                .map(asEmailEntry(RECEIVED_AT)));
     }
 
-    public Flux<MessageId> 
listMailboxContentBeforeSortedByReceivedAt(PostgresMailboxId mailboxId, 
ZonedDateTime since, Limit limit) {
-        Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be 
defined");
+    public Flux<MessageId> 
listMailboxContentBeforeSortedByReceivedAt(PostgresMailboxId mailboxId, 
ZonedDateTime since, Limit limit, boolean collapseThreads) {
+        return EmailQueryViewUtils.QueryViewExtender.of(limit, collapseThreads)
+            .resolve(backendFetchLimit -> 
postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_ID, RECEIVED_AT, THREAD_ID)
+                    .from(TABLE_NAME)
+                    .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+                    .and(RECEIVED_AT.lessOrEqual(since.toOffsetDateTime()))
+                    .orderBy(RECEIVED_AT.desc())
+                    .limit(backendFetchLimit.getLimit().get())))
+                .map(asEmailEntry(RECEIVED_AT)));
+    }
 
-        return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_ID)
-                .from(TABLE_NAME)
-                .where(MAILBOX_ID.eq(mailboxId.asUuid()))
-                .and(RECEIVED_AT.lessOrEqual(since.toOffsetDateTime()))
-                .orderBy(RECEIVED_AT.desc())
-                .limit(limit.getLimit().get())))
-            .map(record -> 
PostgresMessageId.Factory.of(record.get(MESSAGE_ID)));
+    public Flux<MessageId> listMailboxContentSinceSentAt(PostgresMailboxId 
mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads) {
+        return EmailQueryViewUtils.QueryViewExtender.of(limit, collapseThreads)
+            .resolve(backendFetchLimit -> 
postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_ID, SENT_AT, THREAD_ID)
+                    .from(TABLE_NAME)
+                    .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+                    .and(SENT_AT.greaterOrEqual(since.toOffsetDateTime()))
+                    .orderBy(SENT_AT.desc())
+                    .limit(backendFetchLimit.getLimit().get())))
+                .map(asEmailEntry(SENT_AT)));
     }
 
-    public Flux<MessageId> listMailboxContentSinceSentAt(PostgresMailboxId 
mailboxId, ZonedDateTime since, Limit limit) {
-        Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be 
defined");
+    private Function<Record, EmailEntry> asEmailEntry(Field<OffsetDateTime> 
dateField) {
+        return (Record record) -> {
+            PostgresMessageId messageId = 
PostgresMessageId.Factory.of(record.get(MESSAGE_ID));
+            ThreadId threadId = getThreadIdFromRecord(record, messageId);
+            Instant messageDate = record.get(dateField).toInstant();
+            return new EmailEntry(messageId, threadId, messageDate);
+        };
+    }
 
-        return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_ID)
-                .from(TABLE_NAME)
-                .where(MAILBOX_ID.eq(mailboxId.asUuid()))
-                .and(SENT_AT.greaterOrEqual(since.toOffsetDateTime()))
-                .orderBy(SENT_AT.desc())
-                .limit(limit.getLimit().get())))
-            .map(record -> 
PostgresMessageId.Factory.of(record.get(MESSAGE_ID)));
+    private ThreadId getThreadIdFromRecord(Record record, MessageId messageId) 
{
+        UUID threadIdUUID = record.get(THREAD_ID);
+        if (threadIdUUID == null) {
+            return ThreadId.fromBaseMessageId(messageId);
+        }
+        return 
ThreadId.fromBaseMessageId(PostgresMessageId.Factory.of(threadIdUUID));

Review Comment:
   Use Optional



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to