This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push: new 9c4a58d5e7 JAMES-4019 ReactiveThrottler should handle better cancellation (#2104) 9c4a58d5e7 is described below commit 9c4a58d5e73877e28570f138b7199b32b1b6b461 Author: Benoit TELLIER <btell...@linagora.com> AuthorDate: Tue Mar 12 06:57:27 2024 +0100 JAMES-4019 ReactiveThrottler should handle better cancellation (#2104) --- .../james/imapserver/netty/ReactiveThrottler.java | 46 ++++++++++++--- .../imapserver/netty/ReactiveThrottlerTest.java | 67 ++++++++++++++++++++++ 2 files changed, 106 insertions(+), 7 deletions(-) diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java index 05998122ae..09b1b9d01d 100644 --- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java +++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java @@ -19,18 +19,31 @@ package org.apache.james.imapserver.netty; +import java.util.Optional; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.james.imap.api.ImapMessage; import org.apache.james.metrics.api.GaugeRegistry; import org.reactivestreams.Publisher; +import reactor.core.Disposable; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; public class ReactiveThrottler { + private static class TaskHolder { + private final Publisher<Void> task; + private final AtomicReference<Disposable> disposable = new AtomicReference<>(); + + private TaskHolder(Publisher<Void> task) { + this.task = task; + } + } + public static class RejectedException extends RuntimeException { private final ImapMessage imapMessage; @@ -48,7 +61,7 @@ public class ReactiveThrottler { private final int maxQueueSize; // In flight + executing private final AtomicInteger concurrentRequests = new AtomicInteger(0); - private final Queue<Publisher<Void>> queue = new ConcurrentLinkedQueue<>(); + private final Queue<TaskHolder> queue = new ConcurrentLinkedQueue<>(); public ReactiveThrottler(GaugeRegistry gaugeRegistry, int maxConcurrentRequests, int maxQueueSize) { gaugeRegistry.register("imap.request.queue.size", () -> Math.max(concurrentRequests.get() - maxConcurrentRequests, 0)); @@ -69,11 +82,27 @@ public class ReactiveThrottler { .doFinally(any -> onRequestDone()); } else if (requestNumber <= maxQueueSize + maxConcurrentRequests) { // Queue the request for later + AtomicBoolean cancelled = new AtomicBoolean(false); Sinks.One<Void> one = Sinks.one(); - queue.add(Mono.from(task) + TaskHolder taskHolder = new TaskHolder(Mono.fromCallable(cancelled::get) + .flatMap(cancel -> { + if (cancel) { + return Mono.empty(); + } + return Mono.from(task); + }) .then(Mono.fromRunnable(() -> one.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST)))); + queue.add(taskHolder); // Let the caller await task completion - return one.asMono(); + return one.asMono() + .doOnCancel(() -> { + cancelled.set(true); + Optional.ofNullable(taskHolder.disposable.get()).ifPresent(Disposable::dispose); + boolean removed = queue.remove(taskHolder); + if (removed) { + concurrentRequests.decrementAndGet(); + } + }); } else { concurrentRequests.decrementAndGet(); @@ -86,12 +115,15 @@ public class ReactiveThrottler { } private void onRequestDone() { - concurrentRequests.decrementAndGet(); - Publisher<Void> throttled = queue.poll(); + concurrentRequests.getAndDecrement(); + TaskHolder throttled = queue.poll(); if (throttled != null) { - Mono.from(throttled) - .doFinally(any -> onRequestDone()) + Disposable disposable = Mono.from(throttled.task) + .doFinally(any -> { + onRequestDone(); + }) .subscribe(); + throttled.disposable.set(disposable); } } } diff --git a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java index c11ab260a8..e1adafa9d5 100644 --- a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java +++ b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java @@ -24,6 +24,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.time.Duration; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -34,8 +35,10 @@ import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import org.testcontainers.shaded.org.awaitility.Awaitility; +import com.github.fge.lambdas.Throwing; import reactor.core.Disposable; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; class ReactiveThrottlerTest { @@ -69,6 +72,70 @@ class ReactiveThrottlerTest { assertThat(executed.get()).isFalse(); } + @RepeatedTest(10) + void shouldPropagateCancel() throws Exception { + ReactiveThrottler testee = new ReactiveThrottler(new NoopGaugeRegistry(), 2, 5); + CountDownLatch latch = new CountDownLatch(1); + + // Given a throttler + + // When I submit many tasks task - they will get queued + AtomicBoolean executed = new AtomicBoolean(false); + Disposable disposable1 = Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(), NO_IMAP_MESSAGE)).subscribe(); + Disposable disposable2 = Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(), NO_IMAP_MESSAGE)).subscribe(); + Disposable disposable3 = Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(), NO_IMAP_MESSAGE)).subscribe(); + Disposable disposable4 = Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(), NO_IMAP_MESSAGE)).subscribe(); + Disposable disposable5 = Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(), NO_IMAP_MESSAGE)).subscribe(); + Disposable disposable6 = Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(), NO_IMAP_MESSAGE)).subscribe(); + Disposable disposable7 = Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(), NO_IMAP_MESSAGE)).subscribe(); + + disposable7.dispose(); + disposable6.dispose(); + disposable5.dispose(); + disposable4.dispose(); + disposable3.dispose(); + disposable2.dispose(); + disposable1.dispose(); + Thread.sleep(200); + + Mono.from(testee.throttle(Mono.fromRunnable(() -> executed.getAndSet(true)), NO_IMAP_MESSAGE)).block(); + + // Then that task is not executed straight away + assertThat(executed.get()).isTrue(); + } + + @RepeatedTest(10) + void shouldPropagateCancelInReverseOrder() throws Exception { + ReactiveThrottler testee = new ReactiveThrottler(new NoopGaugeRegistry(), 2, 5); + CountDownLatch latch = new CountDownLatch(1); + + // Given a throttler + + // When I submit many tasks task - they will get queued + AtomicBoolean executed = new AtomicBoolean(false); + Disposable disposable1 = Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(), NO_IMAP_MESSAGE)).subscribe(); + Disposable disposable2 = Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(), NO_IMAP_MESSAGE)).subscribe(); + Disposable disposable3 = Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(), NO_IMAP_MESSAGE)).subscribe(); + Disposable disposable4 = Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(), NO_IMAP_MESSAGE)).subscribe(); + Disposable disposable5 = Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(), NO_IMAP_MESSAGE)).subscribe(); + Disposable disposable6 = Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(), NO_IMAP_MESSAGE)).subscribe(); + Disposable disposable7 = Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(), NO_IMAP_MESSAGE)).subscribe(); + + disposable1.dispose(); + disposable2.dispose(); + disposable3.dispose(); + disposable4.dispose(); + disposable5.dispose(); + disposable6.dispose(); + disposable7.dispose(); + Thread.sleep(200); + + Mono.from(testee.throttle(Mono.fromRunnable(() -> executed.getAndSet(true)), NO_IMAP_MESSAGE)).block(); + + // Then that task is not executed straight away + assertThat(executed.get()).isTrue(); + } + @Test void throttleShouldEventuallyExecuteQueuedTasks() { // Given a throttler --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org