quantranhong1999 commented on code in PR #1854:
URL: https://github.com/apache/james-project/pull/1854#discussion_r1426495110
##########
server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java:
##########
@@ -115,6 +118,82 @@ void throttleShouldRejectTasksWhenTheQueueIsFull() {
// And the task is not executed
assertThat(executed.get()).isFalse();
}
+ @Test
+ void throttleShouldRecoverFromABurst() throws Exception {
+ // Given a throttler
+ ReactiveThrottler testee = new ReactiveThrottler(new
NoopGaugeRegistry(), 2, 2);
+
+ // When I submit too many tasks task
+ AtomicBoolean executed = new AtomicBoolean(false);
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+
+ Thread.sleep(500);
+ Mono.from(testee.throttle(Mono.fromRunnable(() ->
executed.getAndSet(true)), NO_IMAP_MESSAGE)).block();
+ // And the task is not executed
+ assertThat(executed.get()).isTrue();
+ }
+ @Test
+ void throttleShouldHandleDisposal() throws Exception {
+ // Given a throttler
+ ReactiveThrottler testee = new ReactiveThrottler(new
NoopGaugeRegistry(), 2, 2);
+
+ // When I submit too many tasks task
+ AtomicBoolean executed = new AtomicBoolean(false);
+ Disposable subscribe1 =
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Disposable subscribe2 =
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Disposable subscribe3 =
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Disposable subscribe4 =
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Disposable subscribe5 =
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Disposable subscribe6 =
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Disposable subscribe7 =
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Disposable subscribe8 =
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ subscribe1.dispose();
+ subscribe2.dispose();
+ subscribe3.dispose();
+ subscribe4.dispose();
+ subscribe5.dispose();
+ subscribe6.dispose();
+ subscribe7.dispose();
+ subscribe8.dispose();
+
+ Thread.sleep(100);
+
+ Mono.from(testee.throttle(Mono.fromRunnable(() ->
executed.getAndSet(true)), NO_IMAP_MESSAGE)).block();
+ // And the task is not executed
+ assertThat(executed.get()).isTrue();
+ }
+ @RepeatedTest(10)
+ void throttleShouldBeConcurrentFriendly() throws Exception {
+ // Given a throttler
+ ReactiveThrottler testee = new ReactiveThrottler(new
NoopGaugeRegistry(), 2, 2);
+
+ ConcurrentTestRunner.builder()
+ .operation((a, b) ->
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE))
+ .onErrorResume(ReactiveThrottler.RejectedException.class, e ->
Mono.empty())
+ .block())
+ .threadCount(20)
+ .operationCount(5)
+ .runSuccessfullyWithin(Duration.ofSeconds(10));
+
+ Thread.sleep(100);
+
+ AtomicBoolean executed = new AtomicBoolean(false);
+ Mono.from(testee.throttle(Mono.fromRunnable(() ->
executed.getAndSet(true)), NO_IMAP_MESSAGE)).block();
+ // And the task is not executed
+ assertThat(executed.get()).isTrue();
Review Comment:
```suggestion
// And the task is executed
assertThat(executed.get()).isTrue();
```
##########
server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java:
##########
@@ -115,6 +118,82 @@ void throttleShouldRejectTasksWhenTheQueueIsFull() {
// And the task is not executed
assertThat(executed.get()).isFalse();
}
+ @Test
+ void throttleShouldRecoverFromABurst() throws Exception {
+ // Given a throttler
+ ReactiveThrottler testee = new ReactiveThrottler(new
NoopGaugeRegistry(), 2, 2);
+
+ // When I submit too many tasks task
+ AtomicBoolean executed = new AtomicBoolean(false);
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+
+ Thread.sleep(500);
+ Mono.from(testee.throttle(Mono.fromRunnable(() ->
executed.getAndSet(true)), NO_IMAP_MESSAGE)).block();
+ // And the task is not executed
+ assertThat(executed.get()).isTrue();
+ }
+ @Test
+ void throttleShouldHandleDisposal() throws Exception {
+ // Given a throttler
+ ReactiveThrottler testee = new ReactiveThrottler(new
NoopGaugeRegistry(), 2, 2);
+
+ // When I submit too many tasks task
+ AtomicBoolean executed = new AtomicBoolean(false);
+ Disposable subscribe1 =
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Disposable subscribe2 =
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Disposable subscribe3 =
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Disposable subscribe4 =
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Disposable subscribe5 =
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Disposable subscribe6 =
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Disposable subscribe7 =
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Disposable subscribe8 =
Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ subscribe1.dispose();
+ subscribe2.dispose();
+ subscribe3.dispose();
+ subscribe4.dispose();
+ subscribe5.dispose();
+ subscribe6.dispose();
+ subscribe7.dispose();
+ subscribe8.dispose();
+
+ Thread.sleep(100);
+
+ Mono.from(testee.throttle(Mono.fromRunnable(() ->
executed.getAndSet(true)), NO_IMAP_MESSAGE)).block();
+ // And the task is not executed
+ assertThat(executed.get()).isTrue();
Review Comment:
```suggestion
// And the task is executed
assertThat(executed.get()).isTrue();
```
##########
server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java:
##########
@@ -115,6 +118,82 @@ void throttleShouldRejectTasksWhenTheQueueIsFull() {
// And the task is not executed
assertThat(executed.get()).isFalse();
}
+ @Test
+ void throttleShouldRecoverFromABurst() throws Exception {
+ // Given a throttler
+ ReactiveThrottler testee = new ReactiveThrottler(new
NoopGaugeRegistry(), 2, 2);
+
+ // When I submit too many tasks task
+ AtomicBoolean executed = new AtomicBoolean(false);
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Mono.from(testee.throttle(Mono.delay(Duration.ofMillis(50)).then(),
NO_IMAP_MESSAGE)).subscribe();
+
+ Thread.sleep(500);
+ Mono.from(testee.throttle(Mono.fromRunnable(() ->
executed.getAndSet(true)), NO_IMAP_MESSAGE)).block();
+ // And the task is not executed
+ assertThat(executed.get()).isTrue();
Review Comment:
```suggestion
// And the task is executed
assertThat(executed.get()).isTrue();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]