This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit f5d1b7355927cd4b82e4efd09d0e5fd320b1941d Author: Benoit TELLIER <btell...@linagora.com> AuthorDate: Sat Mar 16 08:44:44 2024 +0100 [FIX] IMAP FETCH backpressure propagate cancel --- .../james/imap/processor/fetch/FetchProcessor.java | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java index 2caf9a0e7d..83578d358b 100644 --- a/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java +++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java @@ -121,7 +121,11 @@ public class FetchProcessor extends AbstractMailboxProcessor<FetchRequest> { } public Mono<Void> completionMono() { - return sink.asMono(); + return sink.asMono() + .doOnCancel(() -> { + Optional.ofNullable(subscription.get()).ifPresent(Subscription::cancel); + subscription.set(null); + }); } } @@ -234,19 +238,18 @@ public class FetchProcessor extends AbstractMailboxProcessor<FetchRequest> { .doOnNext(responder::respond) .then(); } else { - return Flux.fromIterable(consolidate(selected, ranges, fetch)) + FetchSubscriber fetchSubscriber = new FetchSubscriber(imapSession, responder); + Flux.fromIterable(consolidate(selected, ranges, fetch)) .concatMap(range -> { - FetchSubscriber fetchSubscriber = new FetchSubscriber(imapSession, responder); auditTrail(mailbox, mailboxSession, resultToFetch, range); - Flux.from(mailbox.getMessagesReactive(range, resultToFetch, mailboxSession)) + return Flux.from(mailbox.getMessagesReactive(range, resultToFetch, mailboxSession)) .filter(ids -> !fetch.contains(Item.MODSEQ) || ids.getModSeq().asLong() > fetch.getChangedSince()) - .concatMap(result -> toResponse(mailbox, fetch, mailboxSession, builder, selected, result)) - .subscribe(fetchSubscriber); - - return fetchSubscriber.completionMono(); + .concatMap(result -> toResponse(mailbox, fetch, mailboxSession, builder, selected, result)); }) - .then(); + .subscribe(fetchSubscriber); + + return fetchSubscriber.completionMono(); } } --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org