This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new aad590c27c [ENHANCEMENT] More tests for ReactiveThrottler (#1854)
aad590c27c is described below

commit aad590c27cb564ce7999450812a579737ef8efa2
Author: Benoit TELLIER <[email protected]>
AuthorDate: Fri Dec 15 08:14:37 2023 +0100

    [ENHANCEMENT] More tests for ReactiveThrottler (#1854)
---
 .../imapserver/netty/ReactiveThrottlerTest.java    | 81 ++++++++++++++++++++++
 1 file changed, 81 insertions(+)

diff --git 
a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java
 
b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java
index d107f90ee5..c11ab260a8 100644
--- 
a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java
+++ 
b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java
@@ -29,9 +29,12 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.james.imap.api.ImapMessage;
 import org.apache.james.metrics.api.NoopGaugeRegistry;
+import org.apache.james.util.concurrency.ConcurrentTestRunner;
+import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
 import org.testcontainers.shaded.org.awaitility.Awaitility;
 
+import reactor.core.Disposable;
 import reactor.core.publisher.Mono;
 
 class ReactiveThrottlerTest {
@@ -115,6 +118,84 @@ class ReactiveThrottlerTest {
         // And the task is not executed
         assertThat(executed.get()).isFalse();
     }
+    @Test
+    void throttleShouldRecoverFromABurst() throws Exception {
+        // Given a throttler
+        ReactiveThrottler testee = new ReactiveThrottler(new 
NoopGaugeRegistry(), 2, 2);
+
+        // When I submit too many tasks task
+        AtomicBoolean executed = new AtomicBoolean(false);
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+
+        Thread.sleep(500);
+        Mono.from(testee.throttle(Mono.fromRunnable(() -> 
executed.getAndSet(true)), NO_IMAP_MESSAGE)).block();
+        // And the task is executed
+        assertThat(executed.get()).isTrue();
+    }
+
+    @Test
+    void throttleShouldHandleDisposal() throws Exception {
+        // Given a throttler
+        ReactiveThrottler testee = new ReactiveThrottler(new 
NoopGaugeRegistry(), 2, 2);
+
+        // When I submit too many tasks task
+        AtomicBoolean executed = new AtomicBoolean(false);
+        Disposable subscribe1 = 
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Disposable subscribe2 = 
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Disposable subscribe3 = 
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Disposable subscribe4 = 
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Disposable subscribe5 = 
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Disposable subscribe6 = 
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Disposable subscribe7 = 
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        Disposable subscribe8 = 
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE)).subscribe();
+        subscribe1.dispose();
+        subscribe2.dispose();
+        subscribe3.dispose();
+        subscribe4.dispose();
+        subscribe5.dispose();
+        subscribe6.dispose();
+        subscribe7.dispose();
+        subscribe8.dispose();
+
+        Thread.sleep(100);
+
+        Mono.from(testee.throttle(Mono.fromRunnable(() -> 
executed.getAndSet(true)), NO_IMAP_MESSAGE)).block();
+        // And the task is executed
+        assertThat(executed.get()).isTrue();
+    }
+
+    @RepeatedTest(10)
+    void throttleShouldBeConcurrentFriendly() throws Exception {
+        // Given a throttler
+        ReactiveThrottler testee = new ReactiveThrottler(new 
NoopGaugeRegistry(), 2, 2);
+
+        ConcurrentTestRunner.builder()
+            .operation((a, b) -> 
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(), 
NO_IMAP_MESSAGE))
+                .onErrorResume(ReactiveThrottler.RejectedException.class, e -> 
Mono.empty())
+                .block())
+            .threadCount(20)
+            .operationCount(5)
+            .runSuccessfullyWithin(Duration.ofSeconds(10));
+
+        Thread.sleep(100);
+
+        AtomicBoolean executed = new AtomicBoolean(false);
+        Mono.from(testee.throttle(Mono.fromRunnable(() -> 
executed.getAndSet(true)), NO_IMAP_MESSAGE)).block();
+        // And the task is executed
+        assertThat(executed.get()).isTrue();
+    }
 
     @Test
     void throttleShouldNotAwaitOtherTasks() throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to