This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit a661f7d63d4508b2355b7ba8e831ddf06d36b1d8
Author: Benoit TELLIER <[email protected]>
AuthorDate: Mon Nov 4 11:40:41 2024 +0100

    [FIX] No longer use recursivity in ReactiveThrottler::onRequestDone
---
 .../james/imapserver/netty/ReactiveThrottler.java  | 22 ++++++++++++++++------
 1 file changed, 16 insertions(+), 6 deletions(-)

diff --git 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
index 898149663f..9ada14ec27 100644
--- 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
+++ 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
@@ -33,6 +33,7 @@ import org.reactivestreams.Publisher;
 import reactor.core.Disposable;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Sinks;
+import reactor.core.scheduler.Schedulers;
 
 public class ReactiveThrottler {
     private static class TaskHolder {
@@ -62,12 +63,24 @@ public class ReactiveThrottler {
     // In flight + executing
     private final AtomicInteger concurrentRequests = new AtomicInteger(0);
     private final Queue<TaskHolder> queue = new ConcurrentLinkedQueue<>();
+    private final Sinks.Many<TaskHolder> sink;
 
     public ReactiveThrottler(GaugeRegistry gaugeRegistry, int 
maxConcurrentRequests, int maxQueueSize) {
         gaugeRegistry.register("imap.request.queue.size", () -> 
Math.max(concurrentRequests.get() - maxConcurrentRequests, 0));
 
         this.maxConcurrentRequests = maxConcurrentRequests;
         this.maxQueueSize = maxQueueSize;
+        this.sink = Sinks.many().multicast()
+            .onBackpressureBuffer();
+
+        sink.asFlux()
+            .subscribeOn(Schedulers.parallel())
+            .subscribe(taskHolder -> {
+                    Disposable disposable = Mono.from(taskHolder.task)
+                        .doFinally(any -> onRequestDone())
+                        .subscribe();
+                    taskHolder.disposable.set(disposable);
+                });
     }
 
     public Mono<Void> throttle(Publisher<Void> task, ImapMessage imapMessage) {
@@ -122,12 +135,9 @@ public class ReactiveThrottler {
         concurrentRequests.getAndDecrement();
         TaskHolder throttled = queue.poll();
         if (throttled != null) {
-            Disposable disposable = Mono.from(throttled.task)
-                .doFinally(any -> {
-                    onRequestDone();
-                })
-                .subscribe();
-            throttled.disposable.set(disposable);
+            synchronized (sink) {
+                sink.emitNext(throttled, Sinks.EmitFailureHandler.FAIL_FAST);
+            }
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to