This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 4da8393d53 [ENHANCEMENT] Make IMAP ReactiveThrottler more reliable 
under load (#3012)
4da8393d53 is described below

commit 4da8393d533215a45b422076bf4f46b71409da3b
Author: Benoit TELLIER <[email protected]>
AuthorDate: Tue Apr 14 08:51:54 2026 +0200

    [ENHANCEMENT] Make IMAP ReactiveThrottler more reliable under load (#3012)
---
 .../james/imapserver/netty/ReactiveThrottler.java  |  15 +-
 .../imapserver/netty/ReactiveThrottlerTest.java    | 173 +++++++++++++++++++++
 2 files changed, 183 insertions(+), 5 deletions(-)

diff --git 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
index 8d5dd196a0..c75744bdb8 100644
--- 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
+++ 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
@@ -123,17 +123,22 @@ public class ReactiveThrottler {
                     }
                     return Mono.from(task);
                 })
-                .then(Mono.fromRunnable(() -> 
one.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST))), ctx);
+                // Signal the outer one.asMono() subscriber on both success 
and error paths.
+                // tryEmitEmpty/tryEmitError are used (not FAIL_FAST) to avoid 
throwing when
+                // the subscriber has already cancelled (e.g. channel closed 
mid-queue).
+                .doOnSuccess(ignored -> one.tryEmitEmpty())
+                .doOnError(one::tryEmitError), ctx);
             queue.add(taskHolder);
             // Let the caller await task completion
             return one.asMono()
                 .doOnCancel(() -> {
                     cancelled.set(true);
+                    // Do NOT decrement concurrentRequests here when 
disposable is null
+                    // (task not yet dispatched). The TaskHolder stays in the 
queue, will be
+                    // dispatched as Mono.empty() by the next onRequestDone(), 
and that call
+                    // handles the single authoritative decrement. 
                     Optional.ofNullable(taskHolder.disposable.get())
-                        .ifPresentOrElse(Disposable::dispose,
-                            // If no Disposable set → task never started,
-                            // but still counted → must release the slot
-                            concurrentRequests::decrementAndGet);
+                        .ifPresent(Disposable::dispose);
                 });
         } else {
             concurrentRequests.decrementAndGet();
diff --git 
a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java
 
b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java
index a5cd3d461c..efa84597ae 100644
--- 
a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java
+++ 
b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java
@@ -27,8 +27,11 @@ import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.james.imap.api.ImapMessage;
+import org.apache.james.metrics.api.Gauge;
+import org.apache.james.metrics.api.GaugeRegistry;
 import org.apache.james.metrics.api.NoopGaugeRegistry;
 import org.apache.james.util.concurrency.ConcurrentTestRunner;
 import org.awaitility.Awaitility;
@@ -284,6 +287,176 @@ class ReactiveThrottlerTest {
         assertThat(executed.get()).isTrue();
     }
 
+    @RepeatedTest(10)
+    void concurrencyLimitShouldBeRespectedAfterCancellingQueuedTasks() throws 
Exception {
+        ReactiveThrottler testee = new ReactiveThrottler(new 
NoopGaugeRegistry(), 2, 10);
+        CountDownLatch blocker = new CountDownLatch(1);
+
+        // Fill both concurrent slots with tasks that block until we say so
+        Mono.from(testee.throttle(
+            
Mono.fromRunnable(Throwing.runnable(blocker::await)).subscribeOn(Schedulers.boundedElastic()).then(),
+            NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(
+            
Mono.fromRunnable(Throwing.runnable(blocker::await)).subscribeOn(Schedulers.boundedElastic()).then(),
+            NO_IMAP_MESSAGE)).subscribe();
+
+        // Queue 5 tasks then cancel each one before it is dispatched
+        for (int i = 0; i < 5; i++) {
+            
Mono.from(testee.throttle(Mono.delay(Duration.ofSeconds(10)).then(), 
NO_IMAP_MESSAGE))
+                .subscribe()
+                .dispose();
+        }
+        Thread.sleep(100); // Let cancellation callbacks propagate
+
+        // Release the blocking tasks
+        blocker.countDown();
+        Thread.sleep(200);
+
+        // Submit new tasks and verify parallelism never exceeds 
maxConcurrentRequests = 2
+        AtomicInteger concurrent = new AtomicInteger(0);
+        ConcurrentLinkedDeque<Integer> snapshots = new 
ConcurrentLinkedDeque<>();
+        Mono<Void> measured = Mono.fromRunnable(() -> 
snapshots.add(concurrent.incrementAndGet()))
+            .then(Mono.delay(Duration.ofMillis(50)))
+            .then(Mono.fromRunnable(() -> 
snapshots.add(concurrent.getAndDecrement())));
+
+        for (int i = 0; i < 6; i++) {
+            Mono.from(testee.throttle(measured, NO_IMAP_MESSAGE))
+                .onErrorResume(ReactiveThrottler.RejectedException.class, e -> 
Mono.empty())
+                .subscribe();
+        }
+
+        Awaitility.await().atMost(Duration.ofSeconds(5))
+            .untilAsserted(() -> 
assertThat(snapshots.size()).isGreaterThanOrEqualTo(8));
+
+        assertThat(snapshots).allSatisfy(count -> 
assertThat(count).isBetween(0, 2));
+    }
+
+    @Test
+    void queuedTaskErrorShouldPropagateToOuterSubscriber() {
+        ReactiveThrottler testee = new ReactiveThrottler(new 
NoopGaugeRegistry(), 2, 5);
+
+        // Fill both concurrent slots so the next task is queued
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(200)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(200)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+
+        AtomicBoolean signalReceived = new AtomicBoolean(false);
+        Mono.from(testee.throttle(
+                Mono.error(new RuntimeException("simulated task failure")),
+                NO_IMAP_MESSAGE))
+            .doOnError(e -> signalReceived.set(true))
+            .doOnSuccess(v -> signalReceived.set(true))
+            .onErrorResume(e -> Mono.empty())
+            .subscribe();
+
+        // The outer subscriber must receive either onComplete or onError 
within a reasonable time
+        Awaitility.await().atMost(Duration.ofSeconds(5))
+            .untilAsserted(() -> assertThat(signalReceived.get()).isTrue());
+    }
+
+    @Test
+    void queuedTaskErrorShouldFreeThrottlerSlotForSubsequentTasks() {
+        // A queued task that errors must still release its concurrency slot 
via doFinally,
+        // so the tasks behind it in the queue are not permanently stalled.
+        ReactiveThrottler testee = new ReactiveThrottler(new 
NoopGaugeRegistry(), 2, 5);
+
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(100)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(100)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+
+        // A queued task that will fail when dispatched
+        Mono.from(testee.throttle(
+                Mono.error(new RuntimeException("task error")),
+                NO_IMAP_MESSAGE))
+            .onErrorResume(e -> Mono.empty())
+            .subscribe();
+
+        // A normal task queued after the failing one
+        AtomicBoolean executed = new AtomicBoolean(false);
+        Mono.from(testee.throttle(Mono.fromRunnable(() -> executed.set(true)), 
NO_IMAP_MESSAGE))
+            .subscribe();
+
+        Awaitility.await().atMost(Duration.ofSeconds(5))
+            .untilAsserted(() -> assertThat(executed.get()).isTrue());
+    }
+
+    @Test
+    void cancelledQueuedTaskShouldNotPreventSubsequentTaskExecution() throws 
Exception {
+        // Verifies that a mix of cancelled and completed queued tasks does 
not leave
+        // the throttler in a state where later tasks are permanently stalled.
+        ReactiveThrottler testee = new ReactiveThrottler(new 
NoopGaugeRegistry(), 2, 20);
+        CountDownLatch blocker = new CountDownLatch(1);
+
+        Mono.from(testee.throttle(
+            
Mono.fromRunnable(Throwing.runnable(blocker::await)).subscribeOn(Schedulers.boundedElastic()).then(),
+            NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(
+            
Mono.fromRunnable(Throwing.runnable(blocker::await)).subscribeOn(Schedulers.boundedElastic()).then(),
+            NO_IMAP_MESSAGE)).subscribe();
+
+        // Mix: some queued tasks cancelled, some left to run normally.
+        // Short delay so non-cancelled tasks complete well within the test 
timeout.
+        for (int i = 0; i < 8; i++) {
+            Disposable d = 
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(100)).then(), 
NO_IMAP_MESSAGE))
+                .subscribe();
+            if (i % 2 == 0) {
+                d.dispose();
+            }
+        }
+
+        blocker.countDown();
+        Thread.sleep(1000); // Enough for all 4 non-cancelled 100ms tasks to 
drain (2 concurrent max → 200ms min)
+
+        AtomicInteger executionCount = new AtomicInteger(0);
+        for (int i = 0; i < 4; i++) {
+            
Mono.from(testee.throttle(Mono.fromRunnable(executionCount::incrementAndGet), 
NO_IMAP_MESSAGE))
+                .onErrorResume(ReactiveThrottler.RejectedException.class, e -> 
Mono.empty())
+                .subscribe();
+        }
+
+        Awaitility.await().atMost(Duration.ofSeconds(5))
+            .untilAsserted(() -> 
assertThat(executionCount.get()).isEqualTo(4));
+    }
+
+    @Test
+    void concurrentRequestsGaugeShouldBeZeroWhenIdle() throws Exception {
+        AtomicReference<Gauge<Integer>> concurrentCountGauge = new 
AtomicReference<>();
+        GaugeRegistry capturingRegistry = new GaugeRegistry() {
+            @Override
+            @SuppressWarnings("unchecked")
+            public <T> GaugeRegistry register(String name, Gauge<T> gauge) {
+                if (name.equals("imap.request.concurrent.count")) {
+                    concurrentCountGauge.set((Gauge<Integer>) gauge);
+                }
+                return this;
+            }
+
+            @Override
+            public <T> GaugeRegistry.SettableGauge<T> settableGauge(String 
name) {
+                return value -> { };
+            }
+        };
+        ReactiveThrottler testee = new ReactiveThrottler(capturingRegistry, 2, 
10);
+
+        // Stress the counter: fill slots, queue tasks, cancel them all
+        CountDownLatch blocker = new CountDownLatch(1);
+        Mono.from(testee.throttle(
+            
Mono.fromRunnable(Throwing.runnable(blocker::await)).subscribeOn(Schedulers.boundedElastic()).then(),
+            NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(
+            
Mono.fromRunnable(Throwing.runnable(blocker::await)).subscribeOn(Schedulers.boundedElastic()).then(),
+            NO_IMAP_MESSAGE)).subscribe();
+        for (int i = 0; i < 5; i++) {
+            
Mono.from(testee.throttle(Mono.delay(Duration.ofSeconds(10)).then(), 
NO_IMAP_MESSAGE))
+                .subscribe()
+                .dispose();
+        }
+        Thread.sleep(100);
+        blocker.countDown();
+
+        // Wait for the throttler to fully drain
+        Awaitility.await().atMost(Duration.ofSeconds(5))
+            .untilAsserted(() -> 
assertThat(concurrentCountGauge.get().get()).isZero());
+    }
+
     @Test
     void throttleShouldNotExceedItsConcurrency() {
         // Given a throttler


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to