This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f30707eb3 [FIX] Async connection checks in IMAP (#3009)
7f30707eb3 is described below

commit 7f30707eb3eab0f442d51250f62a79ed3d13d984
Author: Benoit TELLIER <[email protected]>
AuthorDate: Mon Apr 13 08:14:38 2026 +0200

    [FIX] Async connection checks in IMAP (#3009)
---
 .../imapserver/netty/HAProxyMessageHandler.java    | 71 ++++++++++++----------
 .../netty/ImapChannelUpstreamHandler.java          | 18 ++++--
 2 files changed, 52 insertions(+), 37 deletions(-)

diff --git 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/HAProxyMessageHandler.java
 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/HAProxyMessageHandler.java
index 463fbadf4b..6b167834d6 100644
--- 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/HAProxyMessageHandler.java
+++ 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/HAProxyMessageHandler.java
@@ -32,6 +32,8 @@ import org.apache.james.util.MDCBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.github.fge.lambdas.Throwing;
+
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelPipeline;
@@ -39,6 +41,7 @@ import io.netty.handler.codec.haproxy.HAProxyMessage;
 import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol;
 import io.netty.util.AttributeKey;
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 public class HAProxyMessageHandler extends ChannelInboundHandlerAdapter {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(HAProxyMessageHandler.class);
@@ -60,44 +63,50 @@ public class HAProxyMessageHandler extends 
ChannelInboundHandlerAdapter {
         }
     }
 
-    private void handleHAProxyMessage(ChannelHandlerContext ctx, 
HAProxyMessage haproxyMsg) throws Exception {
-        try {
-
-            ChannelPipeline pipeline = ctx.pipeline();
-            ImapSession imapSession = (ImapSession) 
pipeline.channel().attr(SESSION_ATTRIBUTE_KEY).get();
-            if 
(haproxyMsg.proxiedProtocol().equals(HAProxyProxiedProtocol.TCP4) || 
haproxyMsg.proxiedProtocol().equals(HAProxyProxiedProtocol.TCP6)) {
-
-                InetSocketAddress sourceIP = new 
InetSocketAddress(haproxyMsg.sourceAddress(), haproxyMsg.sourcePort());
-                ctx.channel().attr(PROXY_INFO).set(
-                    new ProxyInformation(
-                        sourceIP,
-                        new InetSocketAddress(haproxyMsg.destinationAddress(), 
haproxyMsg.destinationPort())));
-
-                LOGGER.info("Connection from {} runs through {} proxy", 
haproxyMsg.sourceAddress(), haproxyMsg.destinationAddress());
-                // Refresh MDC info to account for proxying
-                MDCBuilder boundMDC = IMAPMDCContext.boundMDC(ctx);
+    private void handleHAProxyMessage(ChannelHandlerContext ctx, 
HAProxyMessage haproxyMsg) {
+        ChannelPipeline pipeline = ctx.pipeline();
+        ImapSession imapSession = (ImapSession) 
pipeline.channel().attr(SESSION_ATTRIBUTE_KEY).get();
+        if (haproxyMsg.proxiedProtocol().equals(HAProxyProxiedProtocol.TCP4) 
|| haproxyMsg.proxiedProtocol().equals(HAProxyProxiedProtocol.TCP6)) {
+
+            InetSocketAddress sourceIP = new 
InetSocketAddress(haproxyMsg.sourceAddress(), haproxyMsg.sourcePort());
+            ctx.channel().attr(PROXY_INFO).set(
+                new ProxyInformation(
+                    sourceIP,
+                    new InetSocketAddress(haproxyMsg.destinationAddress(), 
haproxyMsg.destinationPort())));
+
+            LOGGER.info("Connection from {} runs through {} proxy", 
haproxyMsg.sourceAddress(), haproxyMsg.destinationAddress());
+            // Refresh MDC info to account for proxying
+            MDCBuilder boundMDC = IMAPMDCContext.boundMDC(ctx);
+
+            // Pause reading while the connection check is in progress to 
prevent IMAP
+            // commands from being processed before the check completes.
+            ctx.channel().config().setAutoRead(false);
+            performConnectionCheck(sourceIP)
+                .then(Mono.fromRunnable(() -> {
+                    if (imapSession != null) {
+                        imapSession.setAttribute(MDC_KEY, boundMDC);
+                    }
+                }))
+                .doFinally(any -> haproxyMsg.release())
+                .subscribe(any -> {
+                }, error -> ctx.executor().execute(() -> 
ctx.pipeline().fireExceptionCaught(error)),
+                    () -> ctx.executor().execute(Throwing.runnable(() -> {
+                        ctx.channel().config().setAutoRead(true);
+                        super.channelReadComplete(ctx);
+                    })));
 
-                performConnectionCheck(sourceIP);
-
-                if (imapSession != null) {
-                    imapSession.setAttribute(MDC_KEY, boundMDC);
-                }
-            } else {
-                throw new IllegalArgumentException("Only TCP4/TCP6 are 
supported when using PROXY protocol.");
-            }
-
-            super.channelReadComplete(ctx);
-        } finally {
+        } else {
             haproxyMsg.release();
+            throw new IllegalArgumentException("Only TCP4/TCP6 are supported 
when using PROXY protocol.");
         }
     }
 
-    private void performConnectionCheck(InetSocketAddress realClientIp) {
+    private Mono<Void> performConnectionCheck(InetSocketAddress realClientIp) {
         if (!connectionChecks.isEmpty()) {
-            Flux.fromIterable(connectionChecks)
+            return Flux.fromIterable(connectionChecks)
                 .concatMap(connectionCheck -> 
connectionCheck.validate(realClientIp))
-                .then()
-                .block();
+                .then();
         }
+        return Mono.empty();
     }
 }
diff --git 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
index 8d478926ed..e4435a7786 100644
--- 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
+++ 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
@@ -212,7 +212,7 @@ public class ImapChannelUpstreamHandler extends 
ChannelInboundHandlerAdapter imp
     }
 
     @Override
-    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+    public void channelActive(ChannelHandlerContext ctx) {
         imapChannelGroup.add(ctx.channel());
         SessionId sessionId = SessionId.generate();
         ImapSession imapsession = new NettyImapSession(ctx.channel(), secure, 
compress, authenticationConfiguration.isSSLRequired(),
@@ -225,8 +225,14 @@ public class ImapChannelUpstreamHandler extends 
ChannelInboundHandlerAdapter imp
             .addToContext(MDCBuilder.SESSION_ID, sessionId.asString());
         imapsession.setAttribute(MDC_KEY, boundMDC);
 
-        performConnectionCheck(imapsession.getRemoteAddress());
+        performConnectionCheck(imapsession.getRemoteAddress())
+            .then(Mono.fromRunnable(() -> 
ctx.executor().execute(Throwing.runnable(() -> acceptConnection(ctx, 
imapsession)))))
+            .subscribe(any -> {
+
+            }, error -> ctx.executor().execute(() -> 
ctx.pipeline().fireExceptionCaught(error)));
+    }
 
+    private void acceptConnection(ChannelHandlerContext ctx, ImapSession 
imapsession) throws Exception {
         try (Closeable closeable = mdc(imapsession).build()) {
             InetSocketAddress address = (InetSocketAddress) 
ctx.channel().remoteAddress();
             LOGGER.info("Connection established from {}", 
address.getAddress().getHostAddress());
@@ -249,13 +255,13 @@ public class ImapChannelUpstreamHandler extends 
ChannelInboundHandlerAdapter imp
         }
     }
 
-    private void performConnectionCheck(InetSocketAddress clientIp) {
+    private Mono<Void> performConnectionCheck(InetSocketAddress clientIp) {
         if (!connectionChecks.isEmpty() && !proxyRequired) {
-            Flux.fromIterable(connectionChecks)
+            return Flux.fromIterable(connectionChecks)
                 .concatMap(connectionCheck -> 
connectionCheck.validate(clientIp))
-                .then()
-                .block();
+                .then();
         }
+        return Mono.empty();
     }
 
     private MDCBuilder mdc(ChannelHandlerContext ctx) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to