This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 4da8393d53 [ENHANCEMENT] Make IMAP ReactiveThrottler more reliable
under load (#3012)
4da8393d53 is described below
commit 4da8393d533215a45b422076bf4f46b71409da3b
Author: Benoit TELLIER <[email protected]>
AuthorDate: Tue Apr 14 08:51:54 2026 +0200
[ENHANCEMENT] Make IMAP ReactiveThrottler more reliable under load (#3012)
---
.../james/imapserver/netty/ReactiveThrottler.java | 15 +-
.../imapserver/netty/ReactiveThrottlerTest.java | 173 +++++++++++++++++++++
2 files changed, 183 insertions(+), 5 deletions(-)
diff --git
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
index 8d5dd196a0..c75744bdb8 100644
---
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
+++
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
@@ -123,17 +123,22 @@ public class ReactiveThrottler {
}
return Mono.from(task);
})
- .then(Mono.fromRunnable(() ->
one.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST))), ctx);
+ // Signal the outer one.asMono() subscriber on both success
and error paths.
+ // tryEmitEmpty/tryEmitError are used (not FAIL_FAST) to avoid
throwing when
+ // the subscriber has already cancelled (e.g. channel closed
mid-queue).
+ .doOnSuccess(ignored -> one.tryEmitEmpty())
+ .doOnError(one::tryEmitError), ctx);
queue.add(taskHolder);
// Let the caller await task completion
return one.asMono()
.doOnCancel(() -> {
cancelled.set(true);
+ // Do NOT decrement concurrentRequests here when
disposable is null
+ // (task not yet dispatched). The TaskHolder stays in the
queue, will be
+ // dispatched as Mono.empty() by the next onRequestDone(),
and that call
+ // handles the single authoritative decrement.
Optional.ofNullable(taskHolder.disposable.get())
- .ifPresentOrElse(Disposable::dispose,
- // If no Disposable set → task never started,
- // but still counted → must release the slot
- concurrentRequests::decrementAndGet);
+ .ifPresent(Disposable::dispose);
});
} else {
concurrentRequests.decrementAndGet();
diff --git
a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java
b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java
index a5cd3d461c..efa84597ae 100644
---
a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java
+++
b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java
@@ -27,8 +27,11 @@ import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.james.imap.api.ImapMessage;
+import org.apache.james.metrics.api.Gauge;
+import org.apache.james.metrics.api.GaugeRegistry;
import org.apache.james.metrics.api.NoopGaugeRegistry;
import org.apache.james.util.concurrency.ConcurrentTestRunner;
import org.awaitility.Awaitility;
@@ -284,6 +287,176 @@ class ReactiveThrottlerTest {
assertThat(executed.get()).isTrue();
}
+ @RepeatedTest(10)
+ void concurrencyLimitShouldBeRespectedAfterCancellingQueuedTasks() throws
Exception {
+ ReactiveThrottler testee = new ReactiveThrottler(new
NoopGaugeRegistry(), 2, 10);
+ CountDownLatch blocker = new CountDownLatch(1);
+
+ // Fill both concurrent slots with tasks that block until we say so
+ Mono.from(testee.throttle(
+
Mono.fromRunnable(Throwing.runnable(blocker::await)).subscribeOn(Schedulers.boundedElastic()).then(),
+ NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(
+
Mono.fromRunnable(Throwing.runnable(blocker::await)).subscribeOn(Schedulers.boundedElastic()).then(),
+ NO_IMAP_MESSAGE)).subscribe();
+
+ // Queue 5 tasks then cancel each one before it is dispatched
+ for (int i = 0; i < 5; i++) {
+
Mono.from(testee.throttle(Mono.delay(Duration.ofSeconds(10)).then(),
NO_IMAP_MESSAGE))
+ .subscribe()
+ .dispose();
+ }
+ Thread.sleep(100); // Let cancellation callbacks propagate
+
+ // Release the blocking tasks
+ blocker.countDown();
+ Thread.sleep(200);
+
+ // Submit new tasks and verify parallelism never exceeds
maxConcurrentRequests = 2
+ AtomicInteger concurrent = new AtomicInteger(0);
+ ConcurrentLinkedDeque<Integer> snapshots = new
ConcurrentLinkedDeque<>();
+ Mono<Void> measured = Mono.fromRunnable(() ->
snapshots.add(concurrent.incrementAndGet()))
+ .then(Mono.delay(Duration.ofMillis(50)))
+ .then(Mono.fromRunnable(() ->
snapshots.add(concurrent.getAndDecrement())));
+
+ for (int i = 0; i < 6; i++) {
+ Mono.from(testee.throttle(measured, NO_IMAP_MESSAGE))
+ .onErrorResume(ReactiveThrottler.RejectedException.class, e ->
Mono.empty())
+ .subscribe();
+ }
+
+ Awaitility.await().atMost(Duration.ofSeconds(5))
+ .untilAsserted(() ->
assertThat(snapshots.size()).isGreaterThanOrEqualTo(8));
+
+ assertThat(snapshots).allSatisfy(count ->
assertThat(count).isBetween(0, 2));
+ }
+
+ @Test
+ void queuedTaskErrorShouldPropagateToOuterSubscriber() {
+ ReactiveThrottler testee = new ReactiveThrottler(new
NoopGaugeRegistry(), 2, 5);
+
+ // Fill both concurrent slots so the next task is queued
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(200)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(200)).then(),
NO_IMAP_MESSAGE)).subscribe();
+
+ AtomicBoolean signalReceived = new AtomicBoolean(false);
+ Mono.from(testee.throttle(
+ Mono.error(new RuntimeException("simulated task failure")),
+ NO_IMAP_MESSAGE))
+ .doOnError(e -> signalReceived.set(true))
+ .doOnSuccess(v -> signalReceived.set(true))
+ .onErrorResume(e -> Mono.empty())
+ .subscribe();
+
+ // The outer subscriber must receive either onComplete or onError
within a reasonable time
+ Awaitility.await().atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> assertThat(signalReceived.get()).isTrue());
+ }
+
+ @Test
+ void queuedTaskErrorShouldFreeThrottlerSlotForSubsequentTasks() {
+ // A queued task that errors must still release its concurrency slot
via doFinally,
+ // so the tasks behind it in the queue are not permanently stalled.
+ ReactiveThrottler testee = new ReactiveThrottler(new
NoopGaugeRegistry(), 2, 5);
+
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(100)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(100)).then(),
NO_IMAP_MESSAGE)).subscribe();
+
+ // A queued task that will fail when dispatched
+ Mono.from(testee.throttle(
+ Mono.error(new RuntimeException("task error")),
+ NO_IMAP_MESSAGE))
+ .onErrorResume(e -> Mono.empty())
+ .subscribe();
+
+ // A normal task queued after the failing one
+ AtomicBoolean executed = new AtomicBoolean(false);
+ Mono.from(testee.throttle(Mono.fromRunnable(() -> executed.set(true)),
NO_IMAP_MESSAGE))
+ .subscribe();
+
+ Awaitility.await().atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> assertThat(executed.get()).isTrue());
+ }
+
+ @Test
+ void cancelledQueuedTaskShouldNotPreventSubsequentTaskExecution() throws
Exception {
+ // Verifies that a mix of cancelled and completed queued tasks does
not leave
+ // the throttler in a state where later tasks are permanently stalled.
+ ReactiveThrottler testee = new ReactiveThrottler(new
NoopGaugeRegistry(), 2, 20);
+ CountDownLatch blocker = new CountDownLatch(1);
+
+ Mono.from(testee.throttle(
+
Mono.fromRunnable(Throwing.runnable(blocker::await)).subscribeOn(Schedulers.boundedElastic()).then(),
+ NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(
+
Mono.fromRunnable(Throwing.runnable(blocker::await)).subscribeOn(Schedulers.boundedElastic()).then(),
+ NO_IMAP_MESSAGE)).subscribe();
+
+ // Mix: some queued tasks cancelled, some left to run normally.
+ // Short delay so non-cancelled tasks complete well within the test
timeout.
+ for (int i = 0; i < 8; i++) {
+ Disposable d =
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(100)).then(),
NO_IMAP_MESSAGE))
+ .subscribe();
+ if (i % 2 == 0) {
+ d.dispose();
+ }
+ }
+
+ blocker.countDown();
+ Thread.sleep(1000); // Enough for all 4 non-cancelled 100ms tasks to
drain (2 concurrent max → 200ms min)
+
+ AtomicInteger executionCount = new AtomicInteger(0);
+ for (int i = 0; i < 4; i++) {
+
Mono.from(testee.throttle(Mono.fromRunnable(executionCount::incrementAndGet),
NO_IMAP_MESSAGE))
+ .onErrorResume(ReactiveThrottler.RejectedException.class, e ->
Mono.empty())
+ .subscribe();
+ }
+
+ Awaitility.await().atMost(Duration.ofSeconds(5))
+ .untilAsserted(() ->
assertThat(executionCount.get()).isEqualTo(4));
+ }
+
+ @Test
+ void concurrentRequestsGaugeShouldBeZeroWhenIdle() throws Exception {
+ AtomicReference<Gauge<Integer>> concurrentCountGauge = new
AtomicReference<>();
+ GaugeRegistry capturingRegistry = new GaugeRegistry() {
+ @Override
+ @SuppressWarnings("unchecked")
+ public <T> GaugeRegistry register(String name, Gauge<T> gauge) {
+ if (name.equals("imap.request.concurrent.count")) {
+ concurrentCountGauge.set((Gauge<Integer>) gauge);
+ }
+ return this;
+ }
+
+ @Override
+ public <T> GaugeRegistry.SettableGauge<T> settableGauge(String
name) {
+ return value -> { };
+ }
+ };
+ ReactiveThrottler testee = new ReactiveThrottler(capturingRegistry, 2,
10);
+
+ // Stress the counter: fill slots, queue tasks, cancel them all
+ CountDownLatch blocker = new CountDownLatch(1);
+ Mono.from(testee.throttle(
+
Mono.fromRunnable(Throwing.runnable(blocker::await)).subscribeOn(Schedulers.boundedElastic()).then(),
+ NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(
+
Mono.fromRunnable(Throwing.runnable(blocker::await)).subscribeOn(Schedulers.boundedElastic()).then(),
+ NO_IMAP_MESSAGE)).subscribe();
+ for (int i = 0; i < 5; i++) {
+
Mono.from(testee.throttle(Mono.delay(Duration.ofSeconds(10)).then(),
NO_IMAP_MESSAGE))
+ .subscribe()
+ .dispose();
+ }
+ Thread.sleep(100);
+ blocker.countDown();
+
+ // Wait for the throttler to fully drain
+ Awaitility.await().atMost(Duration.ofSeconds(5))
+ .untilAsserted(() ->
assertThat(concurrentCountGauge.get().get()).isZero());
+ }
+
@Test
void throttleShouldNotExceedItsConcurrency() {
// Given a throttler
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]