This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push: new 70dee40530 JAMES-3811 Ability to cancel IMAP request execution upon closed connections (#1267) 70dee40530 is described below commit 70dee405303b83c79c786d267b6eb4523f247e0e Author: Benoit TELLIER <btell...@linagora.com> AuthorDate: Wed Oct 26 09:17:01 2022 +0700 JAMES-3811 Ability to cancel IMAP request execution upon closed connections (#1267) --- .../james/imapserver/netty/ImapChannelUpstreamHandler.java | 12 +++++++++++- .../org/apache/james/imapserver/netty/NettyConstants.java | 2 ++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java index 2bb3731940..b0dd0e66f6 100644 --- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java +++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java @@ -57,6 +57,8 @@ import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.TooLongFrameException; +import io.netty.util.Attribute; +import reactor.core.Disposable; import reactor.core.publisher.Mono; /** @@ -210,6 +212,7 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp // remove the stored attribute for the channel to free up resources // See JAMES-1195 ImapSession imapSession = ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).getAndSet(null); + Disposable disposableAttribute = ctx.channel().attr(REQUEST_IN_FLIGHT_ATTRIBUTE_KEY).getAndSet(null); Optional.ofNullable(imapSession) .map(ImapSession::logout) @@ -221,6 +224,7 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp .subscribe(any -> { }, ctx::fireExceptionCaught); + Optional.ofNullable(disposableAttribute).ifPresent(Disposable::dispose); } } @@ -272,6 +276,9 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp // logout on error not sure if that is the best way to handle it final ImapSession imapSession = ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).get(); + Optional.ofNullable(ctx.channel().attr(REQUEST_IN_FLIGHT_ATTRIBUTE_KEY).getAndSet(null)) + .ifPresent(Disposable::dispose); + Optional.ofNullable(imapSession) .map(ImapSession::logout) .orElse(Mono.empty()) @@ -298,12 +305,13 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp public void channelRead(ChannelHandlerContext ctx, Object msg) { imapCommandsMetric.increment(); ImapSession session = ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).get(); + Attribute<Disposable> disposableAttribute = ctx.channel().attr(REQUEST_IN_FLIGHT_ATTRIBUTE_KEY); ImapResponseComposer response = new ImapResponseComposerImpl(new ChannelImapResponseWriter(ctx.channel())); ImapMessage message = (ImapMessage) msg; beforeIDLEUponProcessing(ctx); ResponseEncoder responseEncoder = new ResponseEncoder(encoder, response); - reactiveThrottler.throttle( + Disposable disposable = reactiveThrottler.throttle( processor.processReactive(message, responseEncoder, session) .doOnEach(Throwing.consumer(signal -> { if (session.getState() == ImapSessionState.LOGOUT) { @@ -335,12 +343,14 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp if (signal.hasError()) { ctx.fireExceptionCaught(signal.getThrowable()); } + disposableAttribute.set(null); ctx.fireChannelReadComplete(); })) .contextWrite(ReactorUtils.context("imap", mdc(ctx))), message) // Manage throttling errors .doOnError(ctx::fireExceptionCaught) .subscribe(); + disposableAttribute.set(disposable); } private void beforeIDLEUponProcessing(ChannelHandlerContext ctx) { diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java index 3a62cb0931..0db9543968 100644 --- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java +++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.james.imap.api.process.ImapSession; import io.netty.util.AttributeKey; +import reactor.core.Disposable; /** @@ -40,6 +41,7 @@ public interface NettyConstants { String HEARTBEAT_HANDLER = "heartbeatHandler"; AttributeKey<ImapSession> IMAP_SESSION_ATTRIBUTE_KEY = AttributeKey.valueOf("ImapSession"); + AttributeKey<Disposable> REQUEST_IN_FLIGHT_ATTRIBUTE_KEY = AttributeKey.valueOf("requestInFlight"); AttributeKey<Map<String, Object>> FRAME_DECODE_ATTACHMENT_ATTRIBUTE_KEY = AttributeKey.valueOf("FrameDecoderMap"); } --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org