This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new bc88192963 JAMES-3900 Ignore errors upon task manager polling updates 
(#1523)
bc88192963 is described below

commit bc8819296357ccbd7c253226b4bd568996771a80
Author: Benoit TELLIER <btell...@linagora.com>
AuthorDate: Wed Apr 19 08:25:30 2023 +0700

    JAMES-3900 Ignore errors upon task manager polling updates (#1523)
---
 .../apache/james/task/SerialTaskManagerWorker.java |  7 +++++-
 .../james/task/SerialTaskManagerWorkerTest.java    | 28 ++++++++++++++++++++++
 2 files changed, 34 insertions(+), 1 deletion(-)

diff --git 
a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
 
b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
index 66b114c870..4b3f8c5e33 100644
--- 
a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
+++ 
b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
@@ -123,7 +123,12 @@ public class SerialTaskManagerWorker implements 
TaskManagerWorker {
             .delayElement(pollingInterval, Schedulers.parallel())
             .repeat()
             .handle(publishIfPresent())
-            .flatMap(information -> 
Mono.from(listener.updated(taskWithId.getId(), 
Mono.just(information))).thenReturn(information), DEFAULT_CONCURRENCY);
+            .flatMap(information -> 
Mono.from(listener.updated(taskWithId.getId(), Mono.just(information)))
+                .thenReturn(information)
+                .onErrorResume(e -> {
+                    LOGGER.error("Error upon polling additional information 
updates", e);
+                    return Mono.empty();
+                }), DEFAULT_CONCURRENCY);
     }
 
 
diff --git 
a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
 
b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
index eaad5c0764..2fdbb72695 100644
--- 
a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
+++ 
b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
@@ -23,6 +23,7 @@ import static org.awaitility.Durations.TEN_SECONDS;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.ArgumentMatchers.notNull;
+import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.atMost;
 import static org.mockito.Mockito.mock;
@@ -40,6 +41,8 @@ import org.awaitility.Awaitility;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.reactivestreams.Publisher;
 
 import reactor.core.publisher.Flux;
@@ -127,6 +130,31 @@ class SerialTaskManagerWorkerTest {
         verify(listener, atMost(3)).updated(eq(taskWithId.getId()), notNull());
     }
 
+    @Test
+    void errorUponUpdatesShouldNotAbortTheRunningTaskPolledUpdates() {
+        AtomicInteger updatedCounter = new AtomicInteger(0);
+        when(listener.updated(any(), any())).thenAnswer(new 
Answer<Mono<Void>>() {
+            @Override
+            public Mono<Void> answer(InvocationOnMock invocationOnMock) {
+                if (updatedCounter.getAndIncrement() == 1) {
+                    return Mono.error(new RuntimeException());
+                }
+                return Mono.empty();
+            }
+        });
+
+        TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), new 
MemoryReferenceWithCounterTask((counter) ->
+            Mono.fromCallable(counter::incrementAndGet)
+                .delayElement(Duration.ofMillis(1))
+                .repeat(600)
+                .then(Mono.just(Task.Result.COMPLETED))
+                .block()));
+
+        worker.executeTask(taskWithId).block();
+
+        verify(listener, atLeast(3)).updated(eq(taskWithId.getId()), 
notNull());
+    }
+
     @Test
     void aFailedTaskShouldCompleteWithFailedStatus() {
         
ArgumentCaptor<Publisher<Optional<TaskExecutionDetails.AdditionalInformation>>> 
additionalInformationPublisherCapture = 
ArgumentCaptor.forClass(Publisher.class);


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to