This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 915bcc33f2 JAMES-3997 Netty backpressure for IMAP FETCH (#2031)
915bcc33f2 is described below

commit 915bcc33f297f44f159a9fac4eead4e42bebd590
Author: Benoit TELLIER <btell...@linagora.com>
AuthorDate: Sun Feb 25 22:58:19 2024 +0100

    JAMES-3997 Netty backpressure for IMAP FETCH (#2031)
    
    Channel writability is the netty trick for backpressure:
    as a server you shall avoid writing to a channel that is not
    writable.
    
    In order to apply this Netty best practice to Apache James we
    relied on:
     - The ImapSession as a mean to communicate between the
      processor and the channel.
     - A sink to bridge the gap from a push oriented architecture
     (our IMAP stack) to a pull one (netty writability).
---
 .../apache/james/imap/api/process/ImapSession.java | 13 +++++
 .../james/imap/processor/fetch/FetchProcessor.java | 66 ++++++++++++++++++++--
 .../netty/ImapChannelUpstreamHandler.java          |  7 +++
 .../james/imapserver/netty/NettyConstants.java     |  1 +
 .../james/imapserver/netty/NettyImapSession.java   | 10 ++++
 5 files changed, 92 insertions(+), 5 deletions(-)

diff --git 
a/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapSession.java
 
b/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapSession.java
index 5c2bccceb5..94498ef9ce 100644
--- 
a/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapSession.java
+++ 
b/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapSession.java
@@ -102,6 +102,19 @@ public interface ImapSession extends 
CommandDetectionSession {
      */
     Mono<Void> logout();
 
+    /**
+     * Allows implementation to apply back pressure on heavy senders.
+     *
+     * Return true if the sender needs to be throttled.
+     * Return false if backpressure do not need to be applied.
+     * @param restoreBackpressure will be called to restore backpressure to 
its current state when backpressure
+     *                            is no longer needed.
+     */
+    default boolean backpressureNeeded(Runnable restoreBackpressure) {
+        // Naive implementation: never backpressure
+        return false;
+    }
+
     /**
      * Gets the current client state.
      * 
diff --git 
a/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java
 
b/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java
index 65a437324f..2caf9a0e7d 100644
--- 
a/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java
+++ 
b/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java
@@ -28,6 +28,7 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.inject.Inject;
 
@@ -59,6 +60,8 @@ import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.util.AuditTrail;
 import org.apache.james.util.MDCBuilder;
 import org.apache.james.util.ReactorUtils;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,8 +73,58 @@ import it.unimi.dsi.fastutil.longs.LongArrayList;
 import it.unimi.dsi.fastutil.longs.LongList;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.publisher.Sinks;
 
 public class FetchProcessor extends AbstractMailboxProcessor<FetchRequest> {
+    static class FetchSubscriber implements Subscriber<FetchResponse> {
+        private final AtomicReference<Subscription> subscription = new 
AtomicReference<>();
+        private final Sinks.One<Void> sink = Sinks.one();
+        private final ImapSession imapSession;
+        private final Responder responder;
+
+        FetchSubscriber(ImapSession imapSession, Responder responder) {
+            this.imapSession = imapSession;
+            this.responder = responder;
+        }
+
+        @Override
+        public void onSubscribe(Subscription subscription) {
+            this.subscription.set(subscription);
+            subscription.request(1);
+        }
+
+        @Override
+        public void onNext(FetchResponse fetchResponse) {
+            responder.respond(fetchResponse);
+            if (imapSession.backpressureNeeded(this::requestOne)) {
+                LOGGER.debug("Applying backpressure as we encounter a slow 
reader");
+            } else {
+                requestOne();
+            }
+        }
+
+        private void requestOne() {
+            Optional.ofNullable(subscription.get())
+                .ifPresent(s -> s.request(1));
+        }
+
+        @Override
+        public void onError(Throwable throwable) {
+            subscription.set(null);
+            sink.tryEmitError(throwable);
+        }
+
+        @Override
+        public void onComplete() {
+            subscription.set(null);
+            sink.tryEmitEmpty();
+        }
+
+        public Mono<Void> completionMono() {
+            return sink.asMono();
+        }
+    }
+
     private static final Logger LOGGER = 
LoggerFactory.getLogger(FetchProcessor.class);
 
     @Inject
@@ -148,7 +201,7 @@ public class FetchProcessor extends 
AbstractMailboxProcessor<FetchRequest> {
             respondVanished(selected, ranges, responder);
         }
         boolean omitExpunged = (!request.isUseUids());
-        return processMessageRanges(selected, mailbox, ranges, fetch, 
mailboxSession, responder)
+        return processMessageRanges(selected, mailbox, ranges, fetch, 
mailboxSession, responder, session)
             // Don't send expunge responses if FETCH is used to trigger this
             // processor. See IMAP-284
             .then(unsolicitedResponses(session, responder, omitExpunged, 
request.isUseUids()))
@@ -169,7 +222,7 @@ public class FetchProcessor extends 
AbstractMailboxProcessor<FetchRequest> {
      * Process the given message ranges by fetch them and pass them to the
      * {@link org.apache.james.imap.api.process.ImapProcessor.Responder}
      */
-    private Mono<Void> processMessageRanges(SelectedMailbox selected, 
MessageManager mailbox, List<MessageRange> ranges, FetchData fetch, 
MailboxSession mailboxSession, Responder responder) throws MailboxException {
+    private Mono<Void> processMessageRanges(SelectedMailbox selected, 
MessageManager mailbox, List<MessageRange> ranges, FetchData fetch, 
MailboxSession mailboxSession, Responder responder, ImapSession imapSession) {
         FetchResponseBuilder builder = new FetchResponseBuilder(new 
EnvelopeBuilder());
         FetchGroup resultToFetch = FetchDataConverter.getFetchGroup(fetch);
 
@@ -183,12 +236,15 @@ public class FetchProcessor extends 
AbstractMailboxProcessor<FetchRequest> {
         } else {
             return Flux.fromIterable(consolidate(selected, ranges, fetch))
                 .concatMap(range -> {
+                    FetchSubscriber fetchSubscriber = new 
FetchSubscriber(imapSession, responder);
                     auditTrail(mailbox, mailboxSession, resultToFetch, range);
-                    return Flux.from(mailbox.getMessagesReactive(range, 
resultToFetch, mailboxSession))
+
+                    Flux.from(mailbox.getMessagesReactive(range, 
resultToFetch, mailboxSession))
                         .filter(ids -> !fetch.contains(Item.MODSEQ) || 
ids.getModSeq().asLong() > fetch.getChangedSince())
                         .concatMap(result -> toResponse(mailbox, fetch, 
mailboxSession, builder, selected, result))
-                        .doOnNext(responder::respond)
-                        .then();
+                        .subscribe(fetchSubscriber);
+
+                    return fetchSubscriber.completionMono();
                 })
                 .then();
         }
diff --git 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
index 990a33abd7..c0ef774c19 100644
--- 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
+++ 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
@@ -218,7 +218,14 @@ public class ImapChannelUpstreamHandler extends 
ChannelInboundHandlerAdapter imp
             response.flush();
             super.channelActive(ctx);
         }
+    }
 
+    @Override
+    public void channelWritabilityChanged(ChannelHandlerContext ctx) {
+        if (ctx.channel().isWritable()) {
+            
Optional.ofNullable(ctx.channel().attr(BACKPRESSURE_CALLBACK).get())
+                .ifPresent(Runnable::run);
+        }
     }
 
     private void performConnectionCheck(InetSocketAddress clientIp) {
diff --git 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java
 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java
index ca83f40cde..2af87cd415 100644
--- 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java
+++ 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java
@@ -43,6 +43,7 @@ public interface NettyConstants {
     AttributeKey<ImapSession> IMAP_SESSION_ATTRIBUTE_KEY = 
AttributeKey.valueOf("ImapSession");
     AttributeKey<Linearalizer> LINEARALIZER_ATTRIBUTE_KEY = 
AttributeKey.valueOf("Linearalizer");
     AttributeKey<Disposable> REQUEST_IN_FLIGHT_ATTRIBUTE_KEY = 
AttributeKey.valueOf("requestInFlight");
+    AttributeKey<Runnable> BACKPRESSURE_CALLBACK = 
AttributeKey.valueOf("BACKPRESSURE_CALLBACK");
     AttributeKey<Map<String, Object>> FRAME_DECODE_ATTACHMENT_ATTRIBUTE_KEY  = 
AttributeKey.valueOf("FrameDecoderMap");
 
 }
diff --git 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java
 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java
index 4325d329e2..241df87691 100644
--- 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java
+++ 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java
@@ -317,4 +317,14 @@ public class NettyImapSession implements ImapSession, 
NettyConstants {
     public void schedule(Runnable runnable, Duration waitDelay) {
         channel.eventLoop().schedule(runnable, waitDelay.toMillis(), 
TimeUnit.MILLISECONDS);
     }
+
+    @Override
+    public boolean backpressureNeeded(Runnable restoreBackpressure) {
+        boolean writable = channel.isWritable();
+        if (!writable) {
+            channel.attr(BACKPRESSURE_CALLBACK).set(restoreBackpressure);
+            return true;
+        }
+        return false;
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to