This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 77d195f8a552f702370543e6f7da906c2a399aa4
Author: Benoit TELLIER <btell...@linagora.com>
AuthorDate: Sat Mar 16 08:27:43 2024 +0100

    [FIX] IMAP linearization can be simplified
    
    Handle the queuing and linearization within netty channel,
    not into reactor.
    
    This avoids complex signalling at the REACTOR level...
---
 .../netty/ImapChannelUpstreamHandler.java          | 33 ++++++++--
 .../james/imapserver/netty/Linearalizer.java       | 71 ----------------------
 .../james/imapserver/netty/NettyConstants.java     |  2 +-
 .../james/imapserver/netty/LinearalizerTest.java   | 71 ----------------------
 4 files changed, 30 insertions(+), 147 deletions(-)

diff --git 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
index 934a5d4483..094d0284e3 100644
--- 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
+++ 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
@@ -29,6 +29,8 @@ import java.time.Duration;
 import java.util.NoSuchElementException;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.net.ssl.SSLHandshakeException;
 
@@ -156,6 +158,11 @@ public class ImapChannelUpstreamHandler extends 
ChannelInboundHandlerAdapter imp
         }
     }
 
+    static class ImapLinerarizer {
+        private final AtomicBoolean isExecutingRequest = new 
AtomicBoolean(false);
+        private final ConcurrentLinkedQueue<Object> throttled = new 
ConcurrentLinkedQueue<>();
+    }
+
     public static ImapChannelUpstreamHandlerBuilder builder() {
         return new ImapChannelUpstreamHandlerBuilder();
     }
@@ -200,7 +207,7 @@ public class ImapChannelUpstreamHandler extends 
ChannelInboundHandlerAdapter imp
             authenticationConfiguration.isPlainAuthEnabled(), sessionId,
             authenticationConfiguration.getOidcSASLConfiguration());
         ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).set(imapsession);
-        ctx.channel().attr(LINEARALIZER_ATTRIBUTE_KEY).set(new Linearalizer());
+        ctx.channel().attr(LINEARIZER_ATTRIBUTE_KEY).set(new 
ImapLinerarizer());
         MDCBuilder boundMDC = IMAPMDCContext.boundMDC(ctx)
             .addToContext(MDCBuilder.SESSION_ID, sessionId.asString());
         imapsession.setAttribute(MDC_KEY, boundMDC);
@@ -375,8 +382,17 @@ public class ImapChannelUpstreamHandler extends 
ChannelInboundHandlerAdapter imp
     public void channelRead(ChannelHandlerContext ctx, Object msg) {
         imapCommandsMetric.increment();
         ImapSession session = 
ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).get();
-        Linearalizer linearalizer = 
ctx.channel().attr(LINEARALIZER_ATTRIBUTE_KEY).get();
         Attribute<Disposable> disposableAttribute = 
ctx.channel().attr(REQUEST_IN_FLIGHT_ATTRIBUTE_KEY);
+
+        ImapLinerarizer linearalizer = 
ctx.channel().attr(LINEARIZER_ATTRIBUTE_KEY).get();
+        synchronized (linearalizer) {
+            if (linearalizer.isExecutingRequest.get()) {
+                linearalizer.throttled.add(msg);
+                return;
+            }
+            linearalizer.isExecutingRequest.set(true);
+        }
+
         ChannelImapResponseWriter writer = new 
ChannelImapResponseWriter(ctx.channel());
         ImapResponseComposerImpl response = new 
ImapResponseComposerImpl(writer);
         writer.setFlushCallback(response::flush);
@@ -384,8 +400,7 @@ public class ImapChannelUpstreamHandler extends 
ChannelInboundHandlerAdapter imp
 
         beforeIDLEUponProcessing(ctx);
         ResponseEncoder responseEncoder = new ResponseEncoder(encoder, 
response);
-        Disposable disposable = reactiveThrottler.throttle(
-            linearalizer.execute(processor.processReactive(message, 
responseEncoder, session))
+        Disposable disposable = 
reactiveThrottler.throttle(processor.processReactive(message, responseEncoder, 
session)
                 .doOnEach(Throwing.consumer(signal -> {
                     if (session.getState() == ImapSessionState.LOGOUT) {
                         // Make sure we close the channel after all the 
buffers were flushed out
@@ -407,6 +422,11 @@ public class ImapChannelUpstreamHandler extends 
ChannelInboundHandlerAdapter imp
                             ctx.fireExceptionCaught(failure);
                         }
                     }
+                    Object waitingMessage;
+                    synchronized (linearalizer) {
+                        linearalizer.isExecutingRequest.set(false);
+                        waitingMessage = linearalizer.throttled.poll();
+                    }
                     if (signal.isOnComplete() || signal.isOnError()) {
                         afterIDLEUponProcessing(ctx);
                     }
@@ -416,6 +436,11 @@ public class ImapChannelUpstreamHandler extends 
ChannelInboundHandlerAdapter imp
                     disposableAttribute.set(null);
                     response.flush();
                     ctx.fireChannelReadComplete();
+                    if (signal.isOnComplete() || signal.isOnError()) {
+                        if (waitingMessage != null && signal.isOnComplete()) {
+                            channelRead(ctx, waitingMessage);
+                        }
+                    }
                 }))
                 .contextWrite(ReactorUtils.context("imap", mdc(session))), 
message)
             // Manage throttling errors
diff --git 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/Linearalizer.java
 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/Linearalizer.java
deleted file mode 100644
index 05cf3f9e20..0000000000
--- 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/Linearalizer.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.imapserver.netty;
-
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.reactivestreams.Publisher;
-
-import reactor.core.publisher.Mono;
-import reactor.core.publisher.Sinks;
-
-public class Linearalizer {
-    private final ReentrantLock lock = new ReentrantLock();
-    private boolean inFlight = false;
-    private final Queue<Publisher<Void>> queue = new ConcurrentLinkedQueue<>();
-
-    public Mono<Void> execute(Publisher<Void> task) {
-        lock.lock();
-        try {
-            if (!inFlight) {
-                inFlight = true;
-                return Mono.from(task)
-                    .doFinally(any -> onRequestDone());
-            } else {
-                // Queue the request for later
-                Sinks.One<Void> one = Sinks.one();
-                queue.add(Mono.from(task)
-                    .then(Mono.fromRunnable(() -> 
one.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST))));
-                // Let the caller await task completion
-                return one.asMono();
-            }
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    private void onRequestDone() {
-        lock.lock();
-        try {
-            Publisher<Void> throttled = queue.poll();
-            if (throttled != null) {
-                Mono.from(throttled)
-                    .doFinally(any -> onRequestDone())
-                    .subscribe();
-            } else {
-                inFlight = false;
-            }
-        } finally {
-            lock.unlock();
-        }
-    }
-}
diff --git 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java
 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java
index 2af87cd415..2aa10f32f4 100644
--- 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java
+++ 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java
@@ -41,8 +41,8 @@ public interface NettyConstants {
     String HEARTBEAT_HANDLER = "heartbeatHandler";
 
     AttributeKey<ImapSession> IMAP_SESSION_ATTRIBUTE_KEY = 
AttributeKey.valueOf("ImapSession");
-    AttributeKey<Linearalizer> LINEARALIZER_ATTRIBUTE_KEY = 
AttributeKey.valueOf("Linearalizer");
     AttributeKey<Disposable> REQUEST_IN_FLIGHT_ATTRIBUTE_KEY = 
AttributeKey.valueOf("requestInFlight");
+    AttributeKey<ImapChannelUpstreamHandler.ImapLinerarizer> 
LINEARIZER_ATTRIBUTE_KEY = AttributeKey.valueOf("linearizer");
     AttributeKey<Runnable> BACKPRESSURE_CALLBACK = 
AttributeKey.valueOf("BACKPRESSURE_CALLBACK");
     AttributeKey<Map<String, Object>> FRAME_DECODE_ATTACHMENT_ATTRIBUTE_KEY  = 
AttributeKey.valueOf("FrameDecoderMap");
 
diff --git 
a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/LinearalizerTest.java
 
b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/LinearalizerTest.java
deleted file mode 100644
index 9d745a6188..0000000000
--- 
a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/LinearalizerTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.imapserver.netty;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-import java.time.Duration;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.junit.jupiter.api.Test;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
-
-import reactor.core.publisher.Mono;
-
-class LinearalizerTest {
-    @Test
-    void shouldExecuteSubmittedTasks() {
-        Linearalizer testee = new Linearalizer();
-
-        // When I submit a task
-        AtomicBoolean executed = new AtomicBoolean(false);
-        
Mono.from(testee.execute(Mono.delay(Duration.ofMillis(50)).then(Mono.fromRunnable(()
 -> executed.getAndSet(true))))).block();
-
-        // Then that task is executed
-        assertThat(executed.get()).isTrue();
-    }
-
-    @Test
-    void shouldNotExecuteQueuedTasksLogicRightAway() {
-        Linearalizer testee = new Linearalizer();
-
-        // When I submit 2 tasks task
-        AtomicBoolean executed = new AtomicBoolean(false);
-        
Mono.from(testee.execute(Mono.delay(Duration.ofMillis(200)).then())).subscribe();
-        Mono.from(testee.execute(Mono.fromRunnable(() -> 
executed.getAndSet(true)))).subscribe();
-
-        // Then the second task is not executed staight away
-        assertThat(executed.get()).isFalse();
-    }
-
-    @Test
-    void shouldEventuallyExecuteQueuedTasks() {
-        Linearalizer testee = new Linearalizer();
-
-        // When I submit 2 tasks task
-        AtomicBoolean executed = new AtomicBoolean(false);
-        
Mono.from(testee.execute(Mono.delay(Duration.ofMillis(200)).then())).subscribe();
-        Mono.from(testee.execute(Mono.fromRunnable(() -> 
executed.getAndSet(true)))).subscribe();
-
-        // Then that task is eventually executed
-        Awaitility.await().atMost(Duration.ofSeconds(10))
-            .untilAsserted(() -> assertThat(executed.get()).isTrue());
-    }
-}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to