This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch 3.9.x
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 908880642ec39968a5cce6bb36e8218592b7dae8
Author: Benoit TELLIER <[email protected]>
AuthorDate: Thu Oct 2 22:26:14 2025 +0200

    JAMES-3816 Set an upper bound to ReactiveThrottler tasks in order to 
prevent depletion
---
 .../java/org/apache/james/imapserver/netty/ReactiveThrottler.java | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
index 5b19df2799..dad7f4e509 100644
--- 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
+++ 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.imapserver.netty;
 
+import java.time.Duration;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -29,6 +30,7 @@ import java.util.function.Consumer;
 
 import org.apache.james.imap.api.ImapMessage;
 import org.apache.james.metrics.api.GaugeRegistry;
+import org.apache.james.util.DurationParser;
 import org.reactivestreams.Publisher;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -39,6 +41,10 @@ import reactor.core.publisher.Sinks;
 import reactor.core.scheduler.Schedulers;
 
 public class ReactiveThrottler {
+    private static final Duration MAX_EXECUTION_TIME = 
Optional.ofNullable(System.getProperty("james.imap.reactive.throttler.max.task.execution.time"))
+        .map(DurationParser::parse)
+        .orElse(Duration.ofHours(1));
+
     private static class TaskHolder {
         private final Publisher<Void> task;
         private final Consumer<Runnable> ctx;
@@ -82,6 +88,7 @@ public class ReactiveThrottler {
             .subscribeOn(Schedulers.parallel())
             .subscribe(taskHolder -> taskHolder.ctx.accept(() -> {
                     Disposable disposable = Mono.from(taskHolder.task)
+                        .timeout(MAX_EXECUTION_TIME)
                         .doFinally(any -> onRequestDone())
                         .subscribe();
                     taskHolder.disposable.set(disposable);
@@ -102,6 +109,7 @@ public class ReactiveThrottler {
         if (requestNumber <= maxConcurrentRequests) {
             // We have capacity for one more concurrent request
             return Mono.from(task)
+                .timeout(MAX_EXECUTION_TIME)
                 .doFinally(any -> onRequestDone());
         } else if (requestNumber <= maxQueueSize + maxConcurrentRequests) {
             // Queue the request for later


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to