This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 70dee40530 JAMES-3811 Ability to cancel IMAP request execution upon 
closed connections (#1267)
70dee40530 is described below

commit 70dee405303b83c79c786d267b6eb4523f247e0e
Author: Benoit TELLIER <btell...@linagora.com>
AuthorDate: Wed Oct 26 09:17:01 2022 +0700

    JAMES-3811 Ability to cancel IMAP request execution upon closed connections 
(#1267)
---
 .../james/imapserver/netty/ImapChannelUpstreamHandler.java   | 12 +++++++++++-
 .../org/apache/james/imapserver/netty/NettyConstants.java    |  2 ++
 2 files changed, 13 insertions(+), 1 deletion(-)

diff --git 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
index 2bb3731940..b0dd0e66f6 100644
--- 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
+++ 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
@@ -57,6 +57,8 @@ import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.handler.codec.TooLongFrameException;
+import io.netty.util.Attribute;
+import reactor.core.Disposable;
 import reactor.core.publisher.Mono;
 
 /**
@@ -210,6 +212,7 @@ public class ImapChannelUpstreamHandler extends 
ChannelInboundHandlerAdapter imp
             // remove the stored attribute for the channel to free up resources
             // See JAMES-1195
             ImapSession imapSession = 
ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).getAndSet(null);
+            Disposable disposableAttribute = 
ctx.channel().attr(REQUEST_IN_FLIGHT_ATTRIBUTE_KEY).getAndSet(null);
 
             Optional.ofNullable(imapSession)
                 .map(ImapSession::logout)
@@ -221,6 +224,7 @@ public class ImapChannelUpstreamHandler extends 
ChannelInboundHandlerAdapter imp
                 .subscribe(any -> {
 
                 }, ctx::fireExceptionCaught);
+            
Optional.ofNullable(disposableAttribute).ifPresent(Disposable::dispose);
         }
     }
 
@@ -272,6 +276,9 @@ public class ImapChannelUpstreamHandler extends 
ChannelInboundHandlerAdapter imp
         // logout on error not sure if that is the best way to handle it
         final ImapSession imapSession = 
ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).get();
 
+        
Optional.ofNullable(ctx.channel().attr(REQUEST_IN_FLIGHT_ATTRIBUTE_KEY).getAndSet(null))
+            .ifPresent(Disposable::dispose);
+
         Optional.ofNullable(imapSession)
             .map(ImapSession::logout)
             .orElse(Mono.empty())
@@ -298,12 +305,13 @@ public class ImapChannelUpstreamHandler extends 
ChannelInboundHandlerAdapter imp
     public void channelRead(ChannelHandlerContext ctx, Object msg) {
         imapCommandsMetric.increment();
         ImapSession session = 
ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).get();
+        Attribute<Disposable> disposableAttribute = 
ctx.channel().attr(REQUEST_IN_FLIGHT_ATTRIBUTE_KEY);
         ImapResponseComposer response = new ImapResponseComposerImpl(new 
ChannelImapResponseWriter(ctx.channel()));
         ImapMessage message = (ImapMessage) msg;
 
         beforeIDLEUponProcessing(ctx);
         ResponseEncoder responseEncoder = new ResponseEncoder(encoder, 
response);
-        reactiveThrottler.throttle(
+        Disposable disposable = reactiveThrottler.throttle(
             processor.processReactive(message, responseEncoder, session)
                 .doOnEach(Throwing.consumer(signal -> {
                     if (session.getState() == ImapSessionState.LOGOUT) {
@@ -335,12 +343,14 @@ public class ImapChannelUpstreamHandler extends 
ChannelInboundHandlerAdapter imp
                     if (signal.hasError()) {
                         ctx.fireExceptionCaught(signal.getThrowable());
                     }
+                    disposableAttribute.set(null);
                     ctx.fireChannelReadComplete();
                 }))
                 .contextWrite(ReactorUtils.context("imap", mdc(ctx))), message)
             // Manage throttling errors
             .doOnError(ctx::fireExceptionCaught)
             .subscribe();
+        disposableAttribute.set(disposable);
     }
 
     private void beforeIDLEUponProcessing(ChannelHandlerContext ctx) {
diff --git 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java
 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java
index 3a62cb0931..0db9543968 100644
--- 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java
+++ 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import org.apache.james.imap.api.process.ImapSession;
 
 import io.netty.util.AttributeKey;
+import reactor.core.Disposable;
 
 
 /**
@@ -40,6 +41,7 @@ public interface NettyConstants {
     String HEARTBEAT_HANDLER = "heartbeatHandler";
 
     AttributeKey<ImapSession> IMAP_SESSION_ATTRIBUTE_KEY = 
AttributeKey.valueOf("ImapSession");
+    AttributeKey<Disposable> REQUEST_IN_FLIGHT_ATTRIBUTE_KEY = 
AttributeKey.valueOf("requestInFlight");
     AttributeKey<Map<String, Object>> FRAME_DECODE_ATTACHMENT_ATTRIBUTE_KEY  = 
AttributeKey.valueOf("FrameDecoderMap");
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to