This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 930a2e3bdd321b889b91e49a37e937f3a5827a21
Author: Benoit Tellier <[email protected]>
AuthorDate: Thu May 7 13:42:44 2020 +0700

    JAMES-3172 Rely on Reactor for task execution
---
 .../apache/james/task/SerialTaskManagerWorker.java | 60 ++++++++++++++--------
 .../james/task/SerialTaskManagerWorkerTest.java    |  7 +--
 2 files changed, 41 insertions(+), 26 deletions(-)

diff --git 
a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
 
b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
index 8341a16..5438eff 100644
--- 
a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
+++ 
b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
@@ -20,16 +20,14 @@ package org.apache.james.task;
 
 import static org.apache.james.util.ReactorUtils.publishIfPresent;
 
-import java.io.IOException;
 import java.time.Duration;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Stream;
 
 import org.apache.james.util.MDCBuilder;
 import org.apache.james.util.concurrent.NamedThreadFactory;
@@ -42,22 +40,25 @@ import com.google.common.collect.Sets;
 import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
 import reactor.core.scheduler.Schedulers;
 import reactor.util.function.Tuple2;
 import reactor.util.function.Tuples;
 
 public class SerialTaskManagerWorker implements TaskManagerWorker {
-
     private static final Logger LOGGER = 
LoggerFactory.getLogger(SerialTaskManagerWorker.class);
-    private final ExecutorService taskExecutor;
+    public static final boolean MAY_INTERRUPT_IF_RUNNING = true;
+
+    private final Scheduler taskExecutor;
     private final Listener listener;
-    private final AtomicReference<Tuple2<TaskId, Future<?>>> runningTask;
+    private final AtomicReference<Tuple2<TaskId, CompletableFuture>> 
runningTask;
     private final Set<TaskId> cancelledTasks;
     private final Duration pollingInterval;
 
     public SerialTaskManagerWorker(Listener listener, Duration 
pollingInterval) {
         this.pollingInterval = pollingInterval;
-        this.taskExecutor = 
Executors.newSingleThreadExecutor(NamedThreadFactory.withName("task executor"));
+        this.taskExecutor = Schedulers.fromExecutor(
+            
Executors.newSingleThreadExecutor(NamedThreadFactory.withName("task 
executor")));
         this.listener = listener;
         this.cancelledTasks = Sets.newConcurrentHashSet();
         this.runningTask = new AtomicReference<>();
@@ -66,7 +67,8 @@ public class SerialTaskManagerWorker implements 
TaskManagerWorker {
     @Override
     public Mono<Task.Result> executeTask(TaskWithId taskWithId) {
         if (!cancelledTasks.remove(taskWithId.getId())) {
-            CompletableFuture<Task.Result> future = 
CompletableFuture.supplyAsync(() -> runWithMdc(taskWithId, listener), 
taskExecutor);
+            Mono<Task.Result> taskMono = Mono.fromCallable(() -> 
runWithMdc(taskWithId, listener)).subscribeOn(taskExecutor);
+            CompletableFuture<Task.Result> future = taskMono.toFuture();
             runningTask.set(Tuples.of(taskWithId.getId(), future));
 
             return Mono.using(
@@ -109,22 +111,38 @@ public class SerialTaskManagerWorker implements 
TaskManagerWorker {
 
     private Mono<Task.Result> run(TaskWithId taskWithId, Listener listener) {
         return Mono.from(listener.started(taskWithId.getId()))
-            .then(Mono.fromCallable(() -> runTask(taskWithId, listener)))
-            .onErrorResume(InterruptedException.class, e -> 
Mono.from(listener.cancelled(taskWithId.getId(), 
taskWithId.getTask().details())).thenReturn(Task.Result.PARTIAL))
+            .then(runTask(taskWithId, listener))
+            .onErrorResume(this::isCausedByInterruptedException, e -> 
cancelled(taskWithId, listener))
             .onErrorResume(Exception.class, e -> {
                 LOGGER.error("Error while running task {}", 
taskWithId.getId(), e);
                 return Mono.from(listener.failed(taskWithId.getId(), 
taskWithId.getTask().details(), e)).thenReturn(Task.Result.PARTIAL);
             });
     }
 
-    private Task.Result runTask(TaskWithId taskWithId, Listener listener) 
throws InterruptedException {
-        return taskWithId.getTask()
-            .run()
-            .onComplete(result -> 
Mono.from(listener.completed(taskWithId.getId(), result, 
taskWithId.getTask().details())).block())
-            .onFailure(() -> {
-                LOGGER.error("Task was partially performed. Check logs for 
more details. Taskid : " + taskWithId.getId());
-                Mono.from(listener.failed(taskWithId.getId(), 
taskWithId.getTask().details())).block();
-            });
+    private boolean isCausedByInterruptedException(Throwable e) {
+        if (e instanceof InterruptedException) {
+            return true;
+        }
+        return Stream.iterate(e, t -> t.getCause() != null, 
Throwable::getCause)
+            .anyMatch(t -> t instanceof InterruptedException);
+    }
+
+    private Mono<Task.Result> cancelled(TaskWithId taskWithId, Listener 
listener) {
+        TaskId id = taskWithId.getId();
+        Optional<TaskExecutionDetails.AdditionalInformation> details = 
taskWithId.getTask().details();
+
+        return Mono.from(listener.cancelled(id, details))
+            .thenReturn(Task.Result.PARTIAL);
+    }
+
+    private Mono<Task.Result> runTask(TaskWithId taskWithId, Listener 
listener) {
+        return Mono.fromCallable(() -> taskWithId.getTask().run())
+            .doOnNext(result -> result
+                .onComplete(any -> 
Mono.from(listener.completed(taskWithId.getId(), result, 
taskWithId.getTask().details())).block())
+                .onFailure(() -> {
+                    LOGGER.error("Task was partially performed. Check logs for 
more details. Taskid : " + taskWithId.getId());
+                    Mono.from(listener.failed(taskWithId.getId(), 
taskWithId.getTask().details())).block();
+                }));
     }
 
     @Override
@@ -132,7 +150,7 @@ public class SerialTaskManagerWorker implements 
TaskManagerWorker {
         cancelledTasks.add(taskId);
         Optional.ofNullable(runningTask.get())
             .filter(task -> task.getT1().equals(taskId))
-            .ifPresent(task -> task.getT2().cancel(true));
+            .ifPresent(task -> task.getT2().cancel(MAY_INTERRUPT_IF_RUNNING));
     }
 
     @Override
@@ -141,7 +159,7 @@ public class SerialTaskManagerWorker implements 
TaskManagerWorker {
     }
 
     @Override
-    public void close() throws IOException {
-        taskExecutor.shutdownNow();
+    public void close() {
+        taskExecutor.dispose();
     }
 }
diff --git 
a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
 
b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
index 0065546..ed17928 100644
--- 
a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
+++ 
b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
@@ -29,7 +29,6 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
-import java.io.IOException;
 import java.time.Duration;
 import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
@@ -39,7 +38,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 import reactor.core.publisher.Flux;
@@ -69,7 +67,7 @@ class SerialTaskManagerWorkerTest {
     }
 
     @AfterEach
-    void tearDown() throws IOException {
+    void tearDown() {
         worker.close();
     }
 
@@ -101,7 +99,7 @@ class SerialTaskManagerWorkerTest {
     }
 
     @Test
-    void aRunningTaskShouldHaveAFiniteNumberOfInformation() throws 
InterruptedException {
+    void aRunningTaskShouldHaveAFiniteNumberOfInformation() {
         TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), new 
MemoryReferenceWithCounterTask((counter) ->
             Mono.fromCallable(counter::incrementAndGet)
                 .delayElement(Duration.ofSeconds(1))
@@ -170,7 +168,6 @@ class SerialTaskManagerWorkerTest {
         latch.countDown();
     }
 
-    @Disabled("JAMES-3172 We cannot cancel computation started by Reactor")
     @Test
     void taskExecutingReactivelyShouldStopExecutionUponCancel() throws 
InterruptedException {
         // Provide a task ticking every 100ms in a separate reactor thread


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to