This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch 3.9.x in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 908880642ec39968a5cce6bb36e8218592b7dae8 Author: Benoit TELLIER <[email protected]> AuthorDate: Thu Oct 2 22:26:14 2025 +0200 JAMES-3816 Set an upper bound to ReactiveThrottler tasks in order to prevent depletion --- .../java/org/apache/james/imapserver/netty/ReactiveThrottler.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java index 5b19df2799..dad7f4e509 100644 --- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java +++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java @@ -19,6 +19,7 @@ package org.apache.james.imapserver.netty; +import java.time.Duration; import java.util.Optional; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; @@ -29,6 +30,7 @@ import java.util.function.Consumer; import org.apache.james.imap.api.ImapMessage; import org.apache.james.metrics.api.GaugeRegistry; +import org.apache.james.util.DurationParser; import org.reactivestreams.Publisher; import com.google.common.annotations.VisibleForTesting; @@ -39,6 +41,10 @@ import reactor.core.publisher.Sinks; import reactor.core.scheduler.Schedulers; public class ReactiveThrottler { + private static final Duration MAX_EXECUTION_TIME = Optional.ofNullable(System.getProperty("james.imap.reactive.throttler.max.task.execution.time")) + .map(DurationParser::parse) + .orElse(Duration.ofHours(1)); + private static class TaskHolder { private final Publisher<Void> task; private final Consumer<Runnable> ctx; @@ -82,6 +88,7 @@ public class ReactiveThrottler { .subscribeOn(Schedulers.parallel()) .subscribe(taskHolder -> taskHolder.ctx.accept(() -> { Disposable disposable = Mono.from(taskHolder.task) + .timeout(MAX_EXECUTION_TIME) .doFinally(any -> onRequestDone()) .subscribe(); taskHolder.disposable.set(disposable); @@ -102,6 +109,7 @@ public class ReactiveThrottler { if (requestNumber <= maxConcurrentRequests) { // We have capacity for one more concurrent request return Mono.from(task) + .timeout(MAX_EXECUTION_TIME) .doFinally(any -> onRequestDone()); } else if (requestNumber <= maxQueueSize + maxConcurrentRequests) { // Queue the request for later --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
