This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 6a8e0d402d3d91727bc34c2bd2719b3aeab0a87b Author: Benoit Tellier <btell...@linagora.com> AuthorDate: Tue Jun 30 15:29:44 2020 +0700 JAMES-3184 Throttling should survive errors fixup! JAMES-3184 Throttling should survive errors --- .../main/java/org/apache/james/util/ReactorUtils.java | 8 ++++++-- .../java/org/apache/james/util/ReactorUtilsTest.java | 19 +++++++++++++++++++ 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java index 5ec7b9d..b516b8e 100644 --- a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java +++ b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java @@ -52,10 +52,14 @@ public class ReactorUtils { Preconditions.checkArgument(!duration.isZero(), "'windowDuration' must be strictly positive"); return flux -> flux + .onErrorContinue((e, o) -> LOGGER.error("Error encountered while generating throttled entries", e)) .window(elements) .delayElements(duration) - .concatMap(window -> window.flatMap(operation)) - .onErrorContinue((e, o) -> LOGGER.error("Error encountered while throttling for {}", o.toString(), e)); + .concatMap(window -> window.flatMap(operation) + .onErrorResume(e -> { + LOGGER.error("Error encountered while throttling", e); + return Mono.empty(); + })); }; } diff --git a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java index 0ffbc1a..f7fc589 100644 --- a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java +++ b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java @@ -209,6 +209,25 @@ class ReactorUtilsTest { } @Test + void throttleShouldNotOverwriteErrorHandling() { + int windowMaxSize = 3; + Duration windowDuration = Duration.ofMillis(20); + + Flux<Long> originalFlux = Flux.just(0L); + ConcurrentLinkedDeque<Throwable> recordedExceptions = new ConcurrentLinkedDeque<>(); + + originalFlux + .transform(ReactorUtils.<Long, Long>throttle() + .elements(windowMaxSize) + .per(windowDuration) + .forOperation(any -> Mono.<Long>error(new RuntimeException()) + .onErrorResume(e -> Mono.fromRunnable(() -> recordedExceptions.add(e)).thenReturn(any)))) + .blockLast(); + + assertThat(recordedExceptions).hasSize(1); + } + + @Test void throttleShouldHandleLargeFluxes() { int windowMaxSize = 2; Duration windowDuration = Duration.ofMillis(1); --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org