This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c4a58d5e7 JAMES-4019 ReactiveThrottler should handle better 
cancellation (#2104)
9c4a58d5e7 is described below

commit 9c4a58d5e73877e28570f138b7199b32b1b6b461
Author: Benoit TELLIER <btell...@linagora.com>
AuthorDate: Tue Mar 12 06:57:27 2024 +0100

    JAMES-4019 ReactiveThrottler should handle better cancellation (#2104)
---
 .../james/imapserver/netty/ReactiveThrottler.java  | 46 ++++++++++++---
 .../imapserver/netty/ReactiveThrottlerTest.java    | 67 ++++++++++++++++++++++
 2 files changed, 106 insertions(+), 7 deletions(-)

diff --git 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
index 05998122ae..09b1b9d01d 100644
--- 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
+++ 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
@@ -19,18 +19,31 @@
 
 package org.apache.james.imapserver.netty;
 
+import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.james.imap.api.ImapMessage;
 import org.apache.james.metrics.api.GaugeRegistry;
 import org.reactivestreams.Publisher;
 
+import reactor.core.Disposable;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Sinks;
 
 public class ReactiveThrottler {
+    private static class TaskHolder {
+        private final Publisher<Void> task;
+        private final AtomicReference<Disposable> disposable = new 
AtomicReference<>();
+
+        private TaskHolder(Publisher<Void> task) {
+            this.task = task;
+        }
+    }
+
     public static class RejectedException extends RuntimeException {
         private final ImapMessage imapMessage;
 
@@ -48,7 +61,7 @@ public class ReactiveThrottler {
     private final int maxQueueSize;
     // In flight + executing
     private final AtomicInteger concurrentRequests = new AtomicInteger(0);
-    private final Queue<Publisher<Void>> queue = new ConcurrentLinkedQueue<>();
+    private final Queue<TaskHolder> queue = new ConcurrentLinkedQueue<>();
 
     public ReactiveThrottler(GaugeRegistry gaugeRegistry, int 
maxConcurrentRequests, int maxQueueSize) {
         gaugeRegistry.register("imap.request.queue.size", () -> 
Math.max(concurrentRequests.get() - maxConcurrentRequests, 0));
@@ -69,11 +82,27 @@ public class ReactiveThrottler {
                 .doFinally(any -> onRequestDone());
         } else if (requestNumber <= maxQueueSize + maxConcurrentRequests) {
             // Queue the request for later
+            AtomicBoolean cancelled = new AtomicBoolean(false);
             Sinks.One<Void> one = Sinks.one();
-            queue.add(Mono.from(task)
+            TaskHolder taskHolder = new 
TaskHolder(Mono.fromCallable(cancelled::get)
+                .flatMap(cancel -> {
+                    if (cancel) {
+                        return Mono.empty();
+                    }
+                    return Mono.from(task);
+                })
                 .then(Mono.fromRunnable(() -> 
one.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST))));
+            queue.add(taskHolder);
             // Let the caller await task completion
-            return one.asMono();
+            return one.asMono()
+                .doOnCancel(() -> {
+                    cancelled.set(true);
+                    
Optional.ofNullable(taskHolder.disposable.get()).ifPresent(Disposable::dispose);
+                    boolean removed = queue.remove(taskHolder);
+                    if (removed) {
+                        concurrentRequests.decrementAndGet();
+                    }
+                });
         } else {
             concurrentRequests.decrementAndGet();
 
@@ -86,12 +115,15 @@ public class ReactiveThrottler {
     }
 
     private void onRequestDone() {
-        concurrentRequests.decrementAndGet();
-        Publisher<Void> throttled = queue.poll();
+        concurrentRequests.getAndDecrement();
+        TaskHolder throttled = queue.poll();
         if (throttled != null) {
-            Mono.from(throttled)
-                .doFinally(any -> onRequestDone())
+            Disposable disposable = Mono.from(throttled.task)
+                .doFinally(any -> {
+                    onRequestDone();
+                })
                 .subscribe();
+            throttled.disposable.set(disposable);
         }
     }
 }
diff --git 
a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java
 
b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java
index c11ab260a8..e1adafa9d5 100644
--- 
a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java
+++ 
b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java
@@ -24,6 +24,7 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.time.Duration;
 import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -34,8 +35,10 @@ import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
 import org.testcontainers.shaded.org.awaitility.Awaitility;
 
+import com.github.fge.lambdas.Throwing;
 import reactor.core.Disposable;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 class ReactiveThrottlerTest {
 
@@ -69,6 +72,70 @@ class ReactiveThrottlerTest {
         assertThat(executed.get()).isFalse();
     }
 
+    @RepeatedTest(10)
+    void shouldPropagateCancel() throws Exception {
+        ReactiveThrottler testee = new ReactiveThrottler(new 
NoopGaugeRegistry(), 2, 5);
+        CountDownLatch latch = new CountDownLatch(1);
+
+        // Given a throttler
+
+        // When I submit many tasks task - they will get queued
+        AtomicBoolean executed = new AtomicBoolean(false);
+        Disposable disposable1 = 
Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(),
 NO_IMAP_MESSAGE)).subscribe();
+        Disposable disposable2 = 
Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(),
 NO_IMAP_MESSAGE)).subscribe();
+        Disposable disposable3 = 
Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(),
 NO_IMAP_MESSAGE)).subscribe();
+        Disposable disposable4 = 
Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(),
 NO_IMAP_MESSAGE)).subscribe();
+        Disposable disposable5 = 
Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(),
 NO_IMAP_MESSAGE)).subscribe();
+        Disposable disposable6 = 
Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(),
 NO_IMAP_MESSAGE)).subscribe();
+        Disposable disposable7 = 
Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(),
 NO_IMAP_MESSAGE)).subscribe();
+
+        disposable7.dispose();
+        disposable6.dispose();
+        disposable5.dispose();
+        disposable4.dispose();
+        disposable3.dispose();
+        disposable2.dispose();
+        disposable1.dispose();
+        Thread.sleep(200);
+
+        Mono.from(testee.throttle(Mono.fromRunnable(() -> 
executed.getAndSet(true)), NO_IMAP_MESSAGE)).block();
+
+        // Then that task is not executed straight away
+        assertThat(executed.get()).isTrue();
+    }
+
+    @RepeatedTest(10)
+    void shouldPropagateCancelInReverseOrder() throws Exception {
+        ReactiveThrottler testee = new ReactiveThrottler(new 
NoopGaugeRegistry(), 2, 5);
+        CountDownLatch latch = new CountDownLatch(1);
+
+        // Given a throttler
+
+        // When I submit many tasks task - they will get queued
+        AtomicBoolean executed = new AtomicBoolean(false);
+        Disposable disposable1 = 
Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(),
 NO_IMAP_MESSAGE)).subscribe();
+        Disposable disposable2 = 
Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(),
 NO_IMAP_MESSAGE)).subscribe();
+        Disposable disposable3 = 
Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(),
 NO_IMAP_MESSAGE)).subscribe();
+        Disposable disposable4 = 
Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(),
 NO_IMAP_MESSAGE)).subscribe();
+        Disposable disposable5 = 
Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(),
 NO_IMAP_MESSAGE)).subscribe();
+        Disposable disposable6 = 
Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(),
 NO_IMAP_MESSAGE)).subscribe();
+        Disposable disposable7 = 
Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(),
 NO_IMAP_MESSAGE)).subscribe();
+
+        disposable1.dispose();
+        disposable2.dispose();
+        disposable3.dispose();
+        disposable4.dispose();
+        disposable5.dispose();
+        disposable6.dispose();
+        disposable7.dispose();
+        Thread.sleep(200);
+
+        Mono.from(testee.throttle(Mono.fromRunnable(() -> 
executed.getAndSet(true)), NO_IMAP_MESSAGE)).block();
+
+        // Then that task is not executed straight away
+        assertThat(executed.get()).isTrue();
+    }
+
     @Test
     void throttleShouldEventuallyExecuteQueuedTasks() {
         // Given a throttler


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to