chibenwa commented on code in PR #2514:
URL: https://github.com/apache/james-project/pull/2514#discussion_r1855134539
##########
mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAO.java:
##########
@@ -117,78 +115,110 @@ public PostgresMailboxMessageDAO create(Optional<Domain>
domain) {
public static final SortField<Long> DEFAULT_SORT_ORDER_BY =
MESSAGE_UID.asc();
- private static SelectFinalStep<Record1<Long>>
selectMessageUidByMailboxIdAndExtraConditionQuery(PostgresMailboxId mailboxId,
Condition extraCondition, Limit limit, DSLContext dslContext) {
- SelectSeekStep1<Record1<Long>, Long> queryWithoutLimit =
dslContext.select(MESSAGE_UID)
- .from(TABLE_NAME)
- .where(MAILBOX_ID.eq((mailboxId.asUuid())))
- .and(extraCondition)
- .orderBy(MESSAGE_UID.asc());
- return limit.getLimit().map(limitValue ->
(SelectFinalStep<Record1<Long>>) queryWithoutLimit.limit(limitValue))
- .orElse(queryWithoutLimit);
- }
-
private final PostgresExecutor postgresExecutor;
+ private final int queryBatchSize;
- public PostgresMailboxMessageDAO(PostgresExecutor postgresExecutor) {
+ public PostgresMailboxMessageDAO(PostgresExecutor postgresExecutor,
+ int queryBatchSize) {
this.postgresExecutor = postgresExecutor;
+ this.queryBatchSize = queryBatchSize;
}
public Mono<MessageUid> findFirstUnseenMessageUid(PostgresMailboxId
mailboxId) {
- return postgresExecutor.executeRow(dslContext ->
Mono.from(selectMessageUidByMailboxIdAndExtraConditionQuery(mailboxId,
- IS_SEEN.eq(false), Limit.limit(1), dslContext)))
+ return postgresExecutor.executeRow(dslContext ->
Flux.from(dslContext.select(MESSAGE_UID)
+ .from(TABLE_NAME)
+ .where(MAILBOX_ID.eq((mailboxId.asUuid())))
+ .and(IS_SEEN.eq(false))
+ .orderBy(DEFAULT_SORT_ORDER_BY)
+ .limit(1)))
.map(RECORD_TO_MESSAGE_UID_FUNCTION);
}
public Flux<MessageUid> listUnseen(PostgresMailboxId mailboxId) {
- return postgresExecutor.executeRows(dslContext ->
Flux.from(selectMessageUidByMailboxIdAndExtraConditionQuery(mailboxId,
- IS_SEEN.eq(false), Limit.unlimited(), dslContext)))
- .map(RECORD_TO_MESSAGE_UID_FUNCTION);
+ return Flux.range(0, OFFSET_UNLIMITED)
+ .concatMap(offsetIndex -> listUnseenBatch(mailboxId, offsetIndex *
queryBatchSize, queryBatchSize))
+ .takeUntil(List::isEmpty)
+ .flatMapIterable(Function.identity());
+ }
+
+ private Mono<List<MessageUid>> listUnseenBatch(PostgresMailboxId
mailboxId, int offset, int batchSize) {
+ return postgresExecutor.executeRows(dslContext ->
Flux.from(dslContext.select(MESSAGE_UID)
Review Comment:
No Really for UID I strongly believe .collectList would be better.
> the batch help we early release connection to pool
Wrong assumption: yes this wont lock one connection the entire time but we'd
have many request: usage would be higher.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]