This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit a661f7d63d4508b2355b7ba8e831ddf06d36b1d8 Author: Benoit TELLIER <[email protected]> AuthorDate: Mon Nov 4 11:40:41 2024 +0100 [FIX] No longer use recursivity in ReactiveThrottler::onRequestDone --- .../james/imapserver/netty/ReactiveThrottler.java | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java index 898149663f..9ada14ec27 100644 --- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java +++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java @@ -33,6 +33,7 @@ import org.reactivestreams.Publisher; import reactor.core.Disposable; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; +import reactor.core.scheduler.Schedulers; public class ReactiveThrottler { private static class TaskHolder { @@ -62,12 +63,24 @@ public class ReactiveThrottler { // In flight + executing private final AtomicInteger concurrentRequests = new AtomicInteger(0); private final Queue<TaskHolder> queue = new ConcurrentLinkedQueue<>(); + private final Sinks.Many<TaskHolder> sink; public ReactiveThrottler(GaugeRegistry gaugeRegistry, int maxConcurrentRequests, int maxQueueSize) { gaugeRegistry.register("imap.request.queue.size", () -> Math.max(concurrentRequests.get() - maxConcurrentRequests, 0)); this.maxConcurrentRequests = maxConcurrentRequests; this.maxQueueSize = maxQueueSize; + this.sink = Sinks.many().multicast() + .onBackpressureBuffer(); + + sink.asFlux() + .subscribeOn(Schedulers.parallel()) + .subscribe(taskHolder -> { + Disposable disposable = Mono.from(taskHolder.task) + .doFinally(any -> onRequestDone()) + .subscribe(); + taskHolder.disposable.set(disposable); + }); } public Mono<Void> throttle(Publisher<Void> task, ImapMessage imapMessage) { @@ -122,12 +135,9 @@ public class ReactiveThrottler { concurrentRequests.getAndDecrement(); TaskHolder throttled = queue.poll(); if (throttled != null) { - Disposable disposable = Mono.from(throttled.task) - .doFinally(any -> { - onRequestDone(); - }) - .subscribe(); - throttled.disposable.set(disposable); + synchronized (sink) { + sink.emitNext(throttled, Sinks.EmitFailureHandler.FAIL_FAST); + } } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
