This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push: new 915bcc33f2 JAMES-3997 Netty backpressure for IMAP FETCH (#2031) 915bcc33f2 is described below commit 915bcc33f297f44f159a9fac4eead4e42bebd590 Author: Benoit TELLIER <btell...@linagora.com> AuthorDate: Sun Feb 25 22:58:19 2024 +0100 JAMES-3997 Netty backpressure for IMAP FETCH (#2031) Channel writability is the netty trick for backpressure: as a server you shall avoid writing to a channel that is not writable. In order to apply this Netty best practice to Apache James we relied on: - The ImapSession as a mean to communicate between the processor and the channel. - A sink to bridge the gap from a push oriented architecture (our IMAP stack) to a pull one (netty writability). --- .../apache/james/imap/api/process/ImapSession.java | 13 +++++ .../james/imap/processor/fetch/FetchProcessor.java | 66 ++++++++++++++++++++-- .../netty/ImapChannelUpstreamHandler.java | 7 +++ .../james/imapserver/netty/NettyConstants.java | 1 + .../james/imapserver/netty/NettyImapSession.java | 10 ++++ 5 files changed, 92 insertions(+), 5 deletions(-) diff --git a/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapSession.java b/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapSession.java index 5c2bccceb5..94498ef9ce 100644 --- a/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapSession.java +++ b/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapSession.java @@ -102,6 +102,19 @@ public interface ImapSession extends CommandDetectionSession { */ Mono<Void> logout(); + /** + * Allows implementation to apply back pressure on heavy senders. + * + * Return true if the sender needs to be throttled. + * Return false if backpressure do not need to be applied. + * @param restoreBackpressure will be called to restore backpressure to its current state when backpressure + * is no longer needed. + */ + default boolean backpressureNeeded(Runnable restoreBackpressure) { + // Naive implementation: never backpressure + return false; + } + /** * Gets the current client state. * diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java index 65a437324f..2caf9a0e7d 100644 --- a/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java +++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java @@ -28,6 +28,7 @@ import java.util.EnumSet; import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import javax.inject.Inject; @@ -59,6 +60,8 @@ import org.apache.james.metrics.api.MetricFactory; import org.apache.james.util.AuditTrail; import org.apache.james.util.MDCBuilder; import org.apache.james.util.ReactorUtils; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,8 +73,58 @@ import it.unimi.dsi.fastutil.longs.LongArrayList; import it.unimi.dsi.fastutil.longs.LongList; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; public class FetchProcessor extends AbstractMailboxProcessor<FetchRequest> { + static class FetchSubscriber implements Subscriber<FetchResponse> { + private final AtomicReference<Subscription> subscription = new AtomicReference<>(); + private final Sinks.One<Void> sink = Sinks.one(); + private final ImapSession imapSession; + private final Responder responder; + + FetchSubscriber(ImapSession imapSession, Responder responder) { + this.imapSession = imapSession; + this.responder = responder; + } + + @Override + public void onSubscribe(Subscription subscription) { + this.subscription.set(subscription); + subscription.request(1); + } + + @Override + public void onNext(FetchResponse fetchResponse) { + responder.respond(fetchResponse); + if (imapSession.backpressureNeeded(this::requestOne)) { + LOGGER.debug("Applying backpressure as we encounter a slow reader"); + } else { + requestOne(); + } + } + + private void requestOne() { + Optional.ofNullable(subscription.get()) + .ifPresent(s -> s.request(1)); + } + + @Override + public void onError(Throwable throwable) { + subscription.set(null); + sink.tryEmitError(throwable); + } + + @Override + public void onComplete() { + subscription.set(null); + sink.tryEmitEmpty(); + } + + public Mono<Void> completionMono() { + return sink.asMono(); + } + } + private static final Logger LOGGER = LoggerFactory.getLogger(FetchProcessor.class); @Inject @@ -148,7 +201,7 @@ public class FetchProcessor extends AbstractMailboxProcessor<FetchRequest> { respondVanished(selected, ranges, responder); } boolean omitExpunged = (!request.isUseUids()); - return processMessageRanges(selected, mailbox, ranges, fetch, mailboxSession, responder) + return processMessageRanges(selected, mailbox, ranges, fetch, mailboxSession, responder, session) // Don't send expunge responses if FETCH is used to trigger this // processor. See IMAP-284 .then(unsolicitedResponses(session, responder, omitExpunged, request.isUseUids())) @@ -169,7 +222,7 @@ public class FetchProcessor extends AbstractMailboxProcessor<FetchRequest> { * Process the given message ranges by fetch them and pass them to the * {@link org.apache.james.imap.api.process.ImapProcessor.Responder} */ - private Mono<Void> processMessageRanges(SelectedMailbox selected, MessageManager mailbox, List<MessageRange> ranges, FetchData fetch, MailboxSession mailboxSession, Responder responder) throws MailboxException { + private Mono<Void> processMessageRanges(SelectedMailbox selected, MessageManager mailbox, List<MessageRange> ranges, FetchData fetch, MailboxSession mailboxSession, Responder responder, ImapSession imapSession) { FetchResponseBuilder builder = new FetchResponseBuilder(new EnvelopeBuilder()); FetchGroup resultToFetch = FetchDataConverter.getFetchGroup(fetch); @@ -183,12 +236,15 @@ public class FetchProcessor extends AbstractMailboxProcessor<FetchRequest> { } else { return Flux.fromIterable(consolidate(selected, ranges, fetch)) .concatMap(range -> { + FetchSubscriber fetchSubscriber = new FetchSubscriber(imapSession, responder); auditTrail(mailbox, mailboxSession, resultToFetch, range); - return Flux.from(mailbox.getMessagesReactive(range, resultToFetch, mailboxSession)) + + Flux.from(mailbox.getMessagesReactive(range, resultToFetch, mailboxSession)) .filter(ids -> !fetch.contains(Item.MODSEQ) || ids.getModSeq().asLong() > fetch.getChangedSince()) .concatMap(result -> toResponse(mailbox, fetch, mailboxSession, builder, selected, result)) - .doOnNext(responder::respond) - .then(); + .subscribe(fetchSubscriber); + + return fetchSubscriber.completionMono(); }) .then(); } diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java index 990a33abd7..c0ef774c19 100644 --- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java +++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java @@ -218,7 +218,14 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp response.flush(); super.channelActive(ctx); } + } + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) { + if (ctx.channel().isWritable()) { + Optional.ofNullable(ctx.channel().attr(BACKPRESSURE_CALLBACK).get()) + .ifPresent(Runnable::run); + } } private void performConnectionCheck(InetSocketAddress clientIp) { diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java index ca83f40cde..2af87cd415 100644 --- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java +++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java @@ -43,6 +43,7 @@ public interface NettyConstants { AttributeKey<ImapSession> IMAP_SESSION_ATTRIBUTE_KEY = AttributeKey.valueOf("ImapSession"); AttributeKey<Linearalizer> LINEARALIZER_ATTRIBUTE_KEY = AttributeKey.valueOf("Linearalizer"); AttributeKey<Disposable> REQUEST_IN_FLIGHT_ATTRIBUTE_KEY = AttributeKey.valueOf("requestInFlight"); + AttributeKey<Runnable> BACKPRESSURE_CALLBACK = AttributeKey.valueOf("BACKPRESSURE_CALLBACK"); AttributeKey<Map<String, Object>> FRAME_DECODE_ATTACHMENT_ATTRIBUTE_KEY = AttributeKey.valueOf("FrameDecoderMap"); } diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java index 4325d329e2..241df87691 100644 --- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java +++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java @@ -317,4 +317,14 @@ public class NettyImapSession implements ImapSession, NettyConstants { public void schedule(Runnable runnable, Duration waitDelay) { channel.eventLoop().schedule(runnable, waitDelay.toMillis(), TimeUnit.MILLISECONDS); } + + @Override + public boolean backpressureNeeded(Runnable restoreBackpressure) { + boolean writable = channel.isWritable(); + if (!writable) { + channel.attr(BACKPRESSURE_CALLBACK).set(restoreBackpressure); + return true; + } + return false; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org