This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push: new bc88192963 JAMES-3900 Ignore errors upon task manager polling updates (#1523) bc88192963 is described below commit bc8819296357ccbd7c253226b4bd568996771a80 Author: Benoit TELLIER <btell...@linagora.com> AuthorDate: Wed Apr 19 08:25:30 2023 +0700 JAMES-3900 Ignore errors upon task manager polling updates (#1523) --- .../apache/james/task/SerialTaskManagerWorker.java | 7 +++++- .../james/task/SerialTaskManagerWorkerTest.java | 28 ++++++++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java index 66b114c870..4b3f8c5e33 100644 --- a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java +++ b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java @@ -123,7 +123,12 @@ public class SerialTaskManagerWorker implements TaskManagerWorker { .delayElement(pollingInterval, Schedulers.parallel()) .repeat() .handle(publishIfPresent()) - .flatMap(information -> Mono.from(listener.updated(taskWithId.getId(), Mono.just(information))).thenReturn(information), DEFAULT_CONCURRENCY); + .flatMap(information -> Mono.from(listener.updated(taskWithId.getId(), Mono.just(information))) + .thenReturn(information) + .onErrorResume(e -> { + LOGGER.error("Error upon polling additional information updates", e); + return Mono.empty(); + }), DEFAULT_CONCURRENCY); } diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java index eaad5c0764..2fdbb72695 100644 --- a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java +++ b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java @@ -23,6 +23,7 @@ import static org.awaitility.Durations.TEN_SECONDS; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.notNull; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.atMost; import static org.mockito.Mockito.mock; @@ -40,6 +41,8 @@ import org.awaitility.Awaitility; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; @@ -127,6 +130,31 @@ class SerialTaskManagerWorkerTest { verify(listener, atMost(3)).updated(eq(taskWithId.getId()), notNull()); } + @Test + void errorUponUpdatesShouldNotAbortTheRunningTaskPolledUpdates() { + AtomicInteger updatedCounter = new AtomicInteger(0); + when(listener.updated(any(), any())).thenAnswer(new Answer<Mono<Void>>() { + @Override + public Mono<Void> answer(InvocationOnMock invocationOnMock) { + if (updatedCounter.getAndIncrement() == 1) { + return Mono.error(new RuntimeException()); + } + return Mono.empty(); + } + }); + + TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), new MemoryReferenceWithCounterTask((counter) -> + Mono.fromCallable(counter::incrementAndGet) + .delayElement(Duration.ofMillis(1)) + .repeat(600) + .then(Mono.just(Task.Result.COMPLETED)) + .block())); + + worker.executeTask(taskWithId).block(); + + verify(listener, atLeast(3)).updated(eq(taskWithId.getId()), notNull()); + } + @Test void aFailedTaskShouldCompleteWithFailedStatus() { ArgumentCaptor<Publisher<Optional<TaskExecutionDetails.AdditionalInformation>>> additionalInformationPublisherCapture = ArgumentCaptor.forClass(Publisher.class); --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org