This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 77d195f8a552f702370543e6f7da906c2a399aa4 Author: Benoit TELLIER <btell...@linagora.com> AuthorDate: Sat Mar 16 08:27:43 2024 +0100 [FIX] IMAP linearization can be simplified Handle the queuing and linearization within netty channel, not into reactor. This avoids complex signalling at the REACTOR level... --- .../netty/ImapChannelUpstreamHandler.java | 33 ++++++++-- .../james/imapserver/netty/Linearalizer.java | 71 ---------------------- .../james/imapserver/netty/NettyConstants.java | 2 +- .../james/imapserver/netty/LinearalizerTest.java | 71 ---------------------- 4 files changed, 30 insertions(+), 147 deletions(-) diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java index 934a5d4483..094d0284e3 100644 --- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java +++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java @@ -29,6 +29,8 @@ import java.time.Duration; import java.util.NoSuchElementException; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; import javax.net.ssl.SSLHandshakeException; @@ -156,6 +158,11 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp } } + static class ImapLinerarizer { + private final AtomicBoolean isExecutingRequest = new AtomicBoolean(false); + private final ConcurrentLinkedQueue<Object> throttled = new ConcurrentLinkedQueue<>(); + } + public static ImapChannelUpstreamHandlerBuilder builder() { return new ImapChannelUpstreamHandlerBuilder(); } @@ -200,7 +207,7 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp authenticationConfiguration.isPlainAuthEnabled(), sessionId, authenticationConfiguration.getOidcSASLConfiguration()); ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).set(imapsession); - ctx.channel().attr(LINEARALIZER_ATTRIBUTE_KEY).set(new Linearalizer()); + ctx.channel().attr(LINEARIZER_ATTRIBUTE_KEY).set(new ImapLinerarizer()); MDCBuilder boundMDC = IMAPMDCContext.boundMDC(ctx) .addToContext(MDCBuilder.SESSION_ID, sessionId.asString()); imapsession.setAttribute(MDC_KEY, boundMDC); @@ -375,8 +382,17 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp public void channelRead(ChannelHandlerContext ctx, Object msg) { imapCommandsMetric.increment(); ImapSession session = ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).get(); - Linearalizer linearalizer = ctx.channel().attr(LINEARALIZER_ATTRIBUTE_KEY).get(); Attribute<Disposable> disposableAttribute = ctx.channel().attr(REQUEST_IN_FLIGHT_ATTRIBUTE_KEY); + + ImapLinerarizer linearalizer = ctx.channel().attr(LINEARIZER_ATTRIBUTE_KEY).get(); + synchronized (linearalizer) { + if (linearalizer.isExecutingRequest.get()) { + linearalizer.throttled.add(msg); + return; + } + linearalizer.isExecutingRequest.set(true); + } + ChannelImapResponseWriter writer = new ChannelImapResponseWriter(ctx.channel()); ImapResponseComposerImpl response = new ImapResponseComposerImpl(writer); writer.setFlushCallback(response::flush); @@ -384,8 +400,7 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp beforeIDLEUponProcessing(ctx); ResponseEncoder responseEncoder = new ResponseEncoder(encoder, response); - Disposable disposable = reactiveThrottler.throttle( - linearalizer.execute(processor.processReactive(message, responseEncoder, session)) + Disposable disposable = reactiveThrottler.throttle(processor.processReactive(message, responseEncoder, session) .doOnEach(Throwing.consumer(signal -> { if (session.getState() == ImapSessionState.LOGOUT) { // Make sure we close the channel after all the buffers were flushed out @@ -407,6 +422,11 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp ctx.fireExceptionCaught(failure); } } + Object waitingMessage; + synchronized (linearalizer) { + linearalizer.isExecutingRequest.set(false); + waitingMessage = linearalizer.throttled.poll(); + } if (signal.isOnComplete() || signal.isOnError()) { afterIDLEUponProcessing(ctx); } @@ -416,6 +436,11 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp disposableAttribute.set(null); response.flush(); ctx.fireChannelReadComplete(); + if (signal.isOnComplete() || signal.isOnError()) { + if (waitingMessage != null && signal.isOnComplete()) { + channelRead(ctx, waitingMessage); + } + } })) .contextWrite(ReactorUtils.context("imap", mdc(session))), message) // Manage throttling errors diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/Linearalizer.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/Linearalizer.java deleted file mode 100644 index 05cf3f9e20..0000000000 --- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/Linearalizer.java +++ /dev/null @@ -1,71 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.imapserver.netty; - -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.locks.ReentrantLock; - -import org.reactivestreams.Publisher; - -import reactor.core.publisher.Mono; -import reactor.core.publisher.Sinks; - -public class Linearalizer { - private final ReentrantLock lock = new ReentrantLock(); - private boolean inFlight = false; - private final Queue<Publisher<Void>> queue = new ConcurrentLinkedQueue<>(); - - public Mono<Void> execute(Publisher<Void> task) { - lock.lock(); - try { - if (!inFlight) { - inFlight = true; - return Mono.from(task) - .doFinally(any -> onRequestDone()); - } else { - // Queue the request for later - Sinks.One<Void> one = Sinks.one(); - queue.add(Mono.from(task) - .then(Mono.fromRunnable(() -> one.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST)))); - // Let the caller await task completion - return one.asMono(); - } - } finally { - lock.unlock(); - } - } - - private void onRequestDone() { - lock.lock(); - try { - Publisher<Void> throttled = queue.poll(); - if (throttled != null) { - Mono.from(throttled) - .doFinally(any -> onRequestDone()) - .subscribe(); - } else { - inFlight = false; - } - } finally { - lock.unlock(); - } - } -} diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java index 2af87cd415..2aa10f32f4 100644 --- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java +++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java @@ -41,8 +41,8 @@ public interface NettyConstants { String HEARTBEAT_HANDLER = "heartbeatHandler"; AttributeKey<ImapSession> IMAP_SESSION_ATTRIBUTE_KEY = AttributeKey.valueOf("ImapSession"); - AttributeKey<Linearalizer> LINEARALIZER_ATTRIBUTE_KEY = AttributeKey.valueOf("Linearalizer"); AttributeKey<Disposable> REQUEST_IN_FLIGHT_ATTRIBUTE_KEY = AttributeKey.valueOf("requestInFlight"); + AttributeKey<ImapChannelUpstreamHandler.ImapLinerarizer> LINEARIZER_ATTRIBUTE_KEY = AttributeKey.valueOf("linearizer"); AttributeKey<Runnable> BACKPRESSURE_CALLBACK = AttributeKey.valueOf("BACKPRESSURE_CALLBACK"); AttributeKey<Map<String, Object>> FRAME_DECODE_ATTACHMENT_ATTRIBUTE_KEY = AttributeKey.valueOf("FrameDecoderMap"); diff --git a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/LinearalizerTest.java b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/LinearalizerTest.java deleted file mode 100644 index 9d745a6188..0000000000 --- a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/LinearalizerTest.java +++ /dev/null @@ -1,71 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.imapserver.netty; - -import static org.assertj.core.api.Assertions.assertThat; - -import java.time.Duration; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.junit.jupiter.api.Test; -import org.testcontainers.shaded.org.awaitility.Awaitility; - -import reactor.core.publisher.Mono; - -class LinearalizerTest { - @Test - void shouldExecuteSubmittedTasks() { - Linearalizer testee = new Linearalizer(); - - // When I submit a task - AtomicBoolean executed = new AtomicBoolean(false); - Mono.from(testee.execute(Mono.delay(Duration.ofMillis(50)).then(Mono.fromRunnable(() -> executed.getAndSet(true))))).block(); - - // Then that task is executed - assertThat(executed.get()).isTrue(); - } - - @Test - void shouldNotExecuteQueuedTasksLogicRightAway() { - Linearalizer testee = new Linearalizer(); - - // When I submit 2 tasks task - AtomicBoolean executed = new AtomicBoolean(false); - Mono.from(testee.execute(Mono.delay(Duration.ofMillis(200)).then())).subscribe(); - Mono.from(testee.execute(Mono.fromRunnable(() -> executed.getAndSet(true)))).subscribe(); - - // Then the second task is not executed staight away - assertThat(executed.get()).isFalse(); - } - - @Test - void shouldEventuallyExecuteQueuedTasks() { - Linearalizer testee = new Linearalizer(); - - // When I submit 2 tasks task - AtomicBoolean executed = new AtomicBoolean(false); - Mono.from(testee.execute(Mono.delay(Duration.ofMillis(200)).then())).subscribe(); - Mono.from(testee.execute(Mono.fromRunnable(() -> executed.getAndSet(true)))).subscribe(); - - // Then that task is eventually executed - Awaitility.await().atMost(Duration.ofSeconds(10)) - .untilAsserted(() -> assertThat(executed.get()).isTrue()); - } -} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org