This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 7f30707eb3 [FIX] Async connection checks in IMAP (#3009)
7f30707eb3 is described below
commit 7f30707eb3eab0f442d51250f62a79ed3d13d984
Author: Benoit TELLIER <[email protected]>
AuthorDate: Mon Apr 13 08:14:38 2026 +0200
[FIX] Async connection checks in IMAP (#3009)
---
.../imapserver/netty/HAProxyMessageHandler.java | 71 ++++++++++++----------
.../netty/ImapChannelUpstreamHandler.java | 18 ++++--
2 files changed, 52 insertions(+), 37 deletions(-)
diff --git
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/HAProxyMessageHandler.java
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/HAProxyMessageHandler.java
index 463fbadf4b..6b167834d6 100644
---
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/HAProxyMessageHandler.java
+++
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/HAProxyMessageHandler.java
@@ -32,6 +32,8 @@ import org.apache.james.util.MDCBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.github.fge.lambdas.Throwing;
+
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
@@ -39,6 +41,7 @@ import io.netty.handler.codec.haproxy.HAProxyMessage;
import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol;
import io.netty.util.AttributeKey;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
public class HAProxyMessageHandler extends ChannelInboundHandlerAdapter {
private static final Logger LOGGER =
LoggerFactory.getLogger(HAProxyMessageHandler.class);
@@ -60,44 +63,50 @@ public class HAProxyMessageHandler extends
ChannelInboundHandlerAdapter {
}
}
- private void handleHAProxyMessage(ChannelHandlerContext ctx,
HAProxyMessage haproxyMsg) throws Exception {
- try {
-
- ChannelPipeline pipeline = ctx.pipeline();
- ImapSession imapSession = (ImapSession)
pipeline.channel().attr(SESSION_ATTRIBUTE_KEY).get();
- if
(haproxyMsg.proxiedProtocol().equals(HAProxyProxiedProtocol.TCP4) ||
haproxyMsg.proxiedProtocol().equals(HAProxyProxiedProtocol.TCP6)) {
-
- InetSocketAddress sourceIP = new
InetSocketAddress(haproxyMsg.sourceAddress(), haproxyMsg.sourcePort());
- ctx.channel().attr(PROXY_INFO).set(
- new ProxyInformation(
- sourceIP,
- new InetSocketAddress(haproxyMsg.destinationAddress(),
haproxyMsg.destinationPort())));
-
- LOGGER.info("Connection from {} runs through {} proxy",
haproxyMsg.sourceAddress(), haproxyMsg.destinationAddress());
- // Refresh MDC info to account for proxying
- MDCBuilder boundMDC = IMAPMDCContext.boundMDC(ctx);
+ private void handleHAProxyMessage(ChannelHandlerContext ctx,
HAProxyMessage haproxyMsg) {
+ ChannelPipeline pipeline = ctx.pipeline();
+ ImapSession imapSession = (ImapSession)
pipeline.channel().attr(SESSION_ATTRIBUTE_KEY).get();
+ if (haproxyMsg.proxiedProtocol().equals(HAProxyProxiedProtocol.TCP4)
|| haproxyMsg.proxiedProtocol().equals(HAProxyProxiedProtocol.TCP6)) {
+
+ InetSocketAddress sourceIP = new
InetSocketAddress(haproxyMsg.sourceAddress(), haproxyMsg.sourcePort());
+ ctx.channel().attr(PROXY_INFO).set(
+ new ProxyInformation(
+ sourceIP,
+ new InetSocketAddress(haproxyMsg.destinationAddress(),
haproxyMsg.destinationPort())));
+
+ LOGGER.info("Connection from {} runs through {} proxy",
haproxyMsg.sourceAddress(), haproxyMsg.destinationAddress());
+ // Refresh MDC info to account for proxying
+ MDCBuilder boundMDC = IMAPMDCContext.boundMDC(ctx);
+
+ // Pause reading while the connection check is in progress to
prevent IMAP
+ // commands from being processed before the check completes.
+ ctx.channel().config().setAutoRead(false);
+ performConnectionCheck(sourceIP)
+ .then(Mono.fromRunnable(() -> {
+ if (imapSession != null) {
+ imapSession.setAttribute(MDC_KEY, boundMDC);
+ }
+ }))
+ .doFinally(any -> haproxyMsg.release())
+ .subscribe(any -> {
+ }, error -> ctx.executor().execute(() ->
ctx.pipeline().fireExceptionCaught(error)),
+ () -> ctx.executor().execute(Throwing.runnable(() -> {
+ ctx.channel().config().setAutoRead(true);
+ super.channelReadComplete(ctx);
+ })));
- performConnectionCheck(sourceIP);
-
- if (imapSession != null) {
- imapSession.setAttribute(MDC_KEY, boundMDC);
- }
- } else {
- throw new IllegalArgumentException("Only TCP4/TCP6 are
supported when using PROXY protocol.");
- }
-
- super.channelReadComplete(ctx);
- } finally {
+ } else {
haproxyMsg.release();
+ throw new IllegalArgumentException("Only TCP4/TCP6 are supported
when using PROXY protocol.");
}
}
- private void performConnectionCheck(InetSocketAddress realClientIp) {
+ private Mono<Void> performConnectionCheck(InetSocketAddress realClientIp) {
if (!connectionChecks.isEmpty()) {
- Flux.fromIterable(connectionChecks)
+ return Flux.fromIterable(connectionChecks)
.concatMap(connectionCheck ->
connectionCheck.validate(realClientIp))
- .then()
- .block();
+ .then();
}
+ return Mono.empty();
}
}
diff --git
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
index 8d478926ed..e4435a7786 100644
---
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
+++
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
@@ -212,7 +212,7 @@ public class ImapChannelUpstreamHandler extends
ChannelInboundHandlerAdapter imp
}
@Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ public void channelActive(ChannelHandlerContext ctx) {
imapChannelGroup.add(ctx.channel());
SessionId sessionId = SessionId.generate();
ImapSession imapsession = new NettyImapSession(ctx.channel(), secure,
compress, authenticationConfiguration.isSSLRequired(),
@@ -225,8 +225,14 @@ public class ImapChannelUpstreamHandler extends
ChannelInboundHandlerAdapter imp
.addToContext(MDCBuilder.SESSION_ID, sessionId.asString());
imapsession.setAttribute(MDC_KEY, boundMDC);
- performConnectionCheck(imapsession.getRemoteAddress());
+ performConnectionCheck(imapsession.getRemoteAddress())
+ .then(Mono.fromRunnable(() ->
ctx.executor().execute(Throwing.runnable(() -> acceptConnection(ctx,
imapsession)))))
+ .subscribe(any -> {
+
+ }, error -> ctx.executor().execute(() ->
ctx.pipeline().fireExceptionCaught(error)));
+ }
+ private void acceptConnection(ChannelHandlerContext ctx, ImapSession
imapsession) throws Exception {
try (Closeable closeable = mdc(imapsession).build()) {
InetSocketAddress address = (InetSocketAddress)
ctx.channel().remoteAddress();
LOGGER.info("Connection established from {}",
address.getAddress().getHostAddress());
@@ -249,13 +255,13 @@ public class ImapChannelUpstreamHandler extends
ChannelInboundHandlerAdapter imp
}
}
- private void performConnectionCheck(InetSocketAddress clientIp) {
+ private Mono<Void> performConnectionCheck(InetSocketAddress clientIp) {
if (!connectionChecks.isEmpty() && !proxyRequired) {
- Flux.fromIterable(connectionChecks)
+ return Flux.fromIterable(connectionChecks)
.concatMap(connectionCheck ->
connectionCheck.validate(clientIp))
- .then()
- .block();
+ .then();
}
+ return Mono.empty();
}
private MDCBuilder mdc(ChannelHandlerContext ctx) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]