This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit f5d1b7355927cd4b82e4efd09d0e5fd320b1941d
Author: Benoit TELLIER <btell...@linagora.com>
AuthorDate: Sat Mar 16 08:44:44 2024 +0100

    [FIX] IMAP FETCH backpressure propagate cancel
---
 .../james/imap/processor/fetch/FetchProcessor.java  | 21 ++++++++++++---------
 1 file changed, 12 insertions(+), 9 deletions(-)

diff --git 
a/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java
 
b/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java
index 2caf9a0e7d..83578d358b 100644
--- 
a/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java
+++ 
b/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java
@@ -121,7 +121,11 @@ public class FetchProcessor extends 
AbstractMailboxProcessor<FetchRequest> {
         }
 
         public Mono<Void> completionMono() {
-            return sink.asMono();
+            return sink.asMono()
+                .doOnCancel(() -> {
+                    
Optional.ofNullable(subscription.get()).ifPresent(Subscription::cancel);
+                    subscription.set(null);
+                });
         }
     }
 
@@ -234,19 +238,18 @@ public class FetchProcessor extends 
AbstractMailboxProcessor<FetchRequest> {
                 .doOnNext(responder::respond)
                 .then();
         } else {
-            return Flux.fromIterable(consolidate(selected, ranges, fetch))
+            FetchSubscriber fetchSubscriber = new FetchSubscriber(imapSession, 
responder);
+            Flux.fromIterable(consolidate(selected, ranges, fetch))
                 .concatMap(range -> {
-                    FetchSubscriber fetchSubscriber = new 
FetchSubscriber(imapSession, responder);
                     auditTrail(mailbox, mailboxSession, resultToFetch, range);
 
-                    Flux.from(mailbox.getMessagesReactive(range, 
resultToFetch, mailboxSession))
+                    return Flux.from(mailbox.getMessagesReactive(range, 
resultToFetch, mailboxSession))
                         .filter(ids -> !fetch.contains(Item.MODSEQ) || 
ids.getModSeq().asLong() > fetch.getChangedSince())
-                        .concatMap(result -> toResponse(mailbox, fetch, 
mailboxSession, builder, selected, result))
-                        .subscribe(fetchSubscriber);
-
-                    return fetchSubscriber.completionMono();
+                        .concatMap(result -> toResponse(mailbox, fetch, 
mailboxSession, builder, selected, result));
                 })
-                .then();
+                .subscribe(fetchSubscriber);
+
+            return fetchSubscriber.completionMono();
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to