This is an automated email from the ASF dual-hosted git repository.

kao pushed a commit to branch 3.7.x
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/3.7.x by this push:
     new 45ee0bc02b JAMES-3890 Allow parallel execution of safe tasks (backport)
45ee0bc02b is described below

commit 45ee0bc02b0688d2bf85e749c750991361444bb5
Author: Karsten Otto <karsten.o...@akquinet.de>
AuthorDate: Mon Feb 27 08:48:28 2023 +0100

    JAMES-3890 Allow parallel execution of safe tasks (backport)
---
 server/task/task-api/pom.xml                       |  4 +
 .../java/org/apache/james/task/AsyncSafeTask.java  | 29 +++++++
 .../src/main/java/org/apache/james/task/Task.java  | 24 +++++-
 .../apache/james/task/SerialTaskManagerWorker.java | 49 ++++++++----
 .../james/task/SerialTaskManagerWorkerTest.java    | 89 ++++++++++++++++++++++
 5 files changed, 178 insertions(+), 17 deletions(-)

diff --git a/server/task/task-api/pom.xml b/server/task/task-api/pom.xml
index 59cd056ade..916325f749 100644
--- a/server/task/task-api/pom.xml
+++ b/server/task/task-api/pom.xml
@@ -47,6 +47,10 @@
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-core</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.awaitility</groupId>
             <artifactId>awaitility</artifactId>
diff --git 
a/server/task/task-api/src/main/java/org/apache/james/task/AsyncSafeTask.java 
b/server/task/task-api/src/main/java/org/apache/james/task/AsyncSafeTask.java
new file mode 100644
index 0000000000..504fc35a99
--- /dev/null
+++ 
b/server/task/task-api/src/main/java/org/apache/james/task/AsyncSafeTask.java
@@ -0,0 +1,29 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.task;
+
+/**
+ * Marker interface for a task that can safely run in parallel with other 
tasks.
+ * This means it will not likely interfere with the operations of other tasks, 
and be able to handle issues arising
+ * from parallel execution; e.g. if the task lists some messages and then 
tries to access them, it must gracefully
+ * handle the situation when they have been deleted in the meantime.
+ */
+public interface AsyncSafeTask extends Task {
+}
diff --git a/server/task/task-api/src/main/java/org/apache/james/task/Task.java 
b/server/task/task-api/src/main/java/org/apache/james/task/Task.java
index ac8c2116a9..6e2a377b89 100644
--- a/server/task/task-api/src/main/java/org/apache/james/task/Task.java
+++ b/server/task/task-api/src/main/java/org/apache/james/task/Task.java
@@ -23,9 +23,13 @@ import java.util.Arrays;
 import java.util.Optional;
 import java.util.stream.Stream;
 
+import org.reactivestreams.Publisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
 public interface Task {
     Logger LOGGER = LoggerFactory.getLogger(Task.class);
 
@@ -86,11 +90,25 @@ public interface Task {
     }
 
     /**
-     * Runs the migration
+     * Runs the actual task as a blocking operation.
+     * Must implement either this or #runAsync().
      *
-     * @return Return true if fully migrated. Returns false otherwise.
+     * @return complete or partial operation result.
      */
-    Result run() throws InterruptedException;
+    default Result run() throws InterruptedException {
+        return Mono.from(runAsync()).block();
+    }
+
+    /**
+     * Runs the actual task as an asynchronous operation.
+     * Must implement either this or #run().
+     *
+     * @return Publisher supplying complete or partial operation result.
+     */
+    default Publisher<Result> runAsync() {
+        return Mono.fromCallable(this::run)
+            .subscribeOn(Schedulers.elastic());
+    }
 
     TaskType type();
 
diff --git 
a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
 
b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
index e7b6107d75..208346c19b 100644
--- 
a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
+++ 
b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
@@ -22,12 +22,12 @@ import static 
org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
 import static org.apache.james.util.ReactorUtils.publishIfPresent;
 
 import java.time.Duration;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Stream;
 
 import org.apache.james.util.MDCBuilder;
@@ -36,6 +36,7 @@ import org.reactivestreams.Publisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 import reactor.core.Disposable;
@@ -43,16 +44,15 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Scheduler;
 import reactor.core.scheduler.Schedulers;
-import reactor.util.function.Tuple2;
-import reactor.util.function.Tuples;
 
 public class SerialTaskManagerWorker implements TaskManagerWorker {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(SerialTaskManagerWorker.class);
     public static final boolean MAY_INTERRUPT_IF_RUNNING = true;
 
     private final Scheduler taskExecutor;
+    private final Scheduler asyncTaskExecutor;
     private final Listener listener;
-    private final AtomicReference<Tuple2<TaskId, CompletableFuture>> 
runningTask;
+    private final Map<TaskId, CompletableFuture<Task.Result>> runningTasks;
     private final Set<TaskId> cancelledTasks;
     private final Duration pollingInterval;
 
@@ -60,33 +60,54 @@ public class SerialTaskManagerWorker implements 
TaskManagerWorker {
         this.pollingInterval = pollingInterval;
         this.taskExecutor = Schedulers.fromExecutor(
             
Executors.newSingleThreadExecutor(NamedThreadFactory.withName("task 
executor")));
+        this.asyncTaskExecutor = Schedulers.fromExecutor(
+            Executors.newCachedThreadPool(NamedThreadFactory.withName("async 
task executor")));
         this.listener = listener;
         this.cancelledTasks = Sets.newConcurrentHashSet();
-        this.runningTask = new AtomicReference<>();
+        this.runningTasks = Maps.newConcurrentMap();
     }
 
     @Override
     public Mono<Task.Result> executeTask(TaskWithId taskWithId) {
         if (!cancelledTasks.remove(taskWithId.getId())) {
-            Mono<Task.Result> taskMono = Mono.fromCallable(() -> 
runWithMdc(taskWithId, listener)).subscribeOn(taskExecutor);
+            Mono<Task.Result> taskMono = Mono.fromCallable(() -> 
runWithMdc(taskWithId, listener)).subscribeOn(schedulerForTask(taskWithId));
             CompletableFuture<Task.Result> future = taskMono.toFuture();
-            runningTask.set(Tuples.of(taskWithId.getId(), future));
+            runningTasks.put(taskWithId.getId(), future);
 
-            return Mono.using(
+            Mono<Task.Result> pollingMono = Mono.using(
                 () -> pollAdditionalInformation(taskWithId).subscribe(),
                 ignored -> Mono.fromFuture(future)
                     .onErrorResume(exception -> 
Mono.from(handleExecutionError(taskWithId, listener, exception))
                             .thenReturn(Task.Result.PARTIAL)),
-                Disposable::dispose);
+                Disposable::dispose)
+                .doOnTerminate(() -> runningTasks.remove(taskWithId.getId()));
+
+            if (taskWithId.getTask() instanceof AsyncSafeTask) {
+                pollingMono.subscribe();
+                return Mono.empty();
+            } else {
+                return pollingMono;
+            }
         } else {
             return Mono.from(listener.cancelled(taskWithId.getId(), 
taskWithId.getTask().details()))
+                .doOnTerminate(() -> runningTasks.remove(taskWithId.getId()))
                 .then(Mono.empty());
         }
     }
 
+    private Scheduler schedulerForTask(TaskWithId taskWithId) {
+        if (taskWithId.getTask() instanceof AsyncSafeTask) {
+            return asyncTaskExecutor;
+        } else {
+            return taskExecutor;
+        }
+    }
+
     private Publisher<Void> handleExecutionError(TaskWithId taskWithId, 
Listener listener, Throwable exception) {
         if (exception instanceof CancellationException) {
-            return listener.cancelled(taskWithId.getId(), 
taskWithId.getTask().details());
+            return Mono.from(listener.cancelled(taskWithId.getId(), 
taskWithId.getTask().details()))
+                .then(Mono.fromCallable(() -> 
cancelledTasks.remove(taskWithId.getId())))
+                .then();
         } else {
             return listener.failed(taskWithId.getId(), 
taskWithId.getTask().details(), exception);
         }
@@ -137,7 +158,7 @@ public class SerialTaskManagerWorker implements 
TaskManagerWorker {
     }
 
     private Mono<Task.Result> runTask(TaskWithId taskWithId, Listener 
listener) {
-        return Mono.fromCallable(() -> taskWithId.getTask().run())
+        return Mono.from(taskWithId.getTask().runAsync())
             .doOnNext(result -> result
                 .onComplete(any -> 
Mono.from(listener.completed(taskWithId.getId(), result, 
taskWithId.getTask().details())).block())
                 .onFailure(() -> {
@@ -149,9 +170,8 @@ public class SerialTaskManagerWorker implements 
TaskManagerWorker {
     @Override
     public void cancelTask(TaskId taskId) {
         cancelledTasks.add(taskId);
-        Optional.ofNullable(runningTask.get())
-            .filter(task -> task.getT1().equals(taskId))
-            .ifPresent(task -> task.getT2().cancel(MAY_INTERRUPT_IF_RUNNING));
+        Optional.ofNullable(runningTasks.get(taskId))
+            .ifPresent(task -> task.cancel(MAY_INTERRUPT_IF_RUNNING));
     }
 
     @Override
@@ -162,5 +182,6 @@ public class SerialTaskManagerWorker implements 
TaskManagerWorker {
     @Override
     public void close() {
         taskExecutor.dispose();
+        asyncTaskExecutor.dispose();
     }
 }
diff --git 
a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
 
b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
index 77c2be8803..7025ca6b72 100644
--- 
a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
+++ 
b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
@@ -233,7 +233,96 @@ class SerialTaskManagerWorkerTest {
         verify(listener, atLeastOnce()).cancelled(id, Optional.empty());
         verifyNoMoreInteractions(listener);
     }
+    
+    @Test
+    void theWorkerShouldCancelAnInProgressAsyncTask() throws 
InterruptedException {
+        TaskId id = TaskId.generateTaskId();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        Task inProgressTask = new AsyncSafeTask() {
+            @Override
+            public Mono<Result> runAsync() {
+                return Mono.fromCallable(() -> {
+                    await(latch);
+                    return Task.Result.COMPLETED;
+                });
+            }
+
+            @Override
+            public TaskType type() {
+                return TaskType.of("async memory task");
+            }
+        };
+
+        TaskWithId taskWithId = new TaskWithId(id, inProgressTask);
+
+        Mono<Task.Result> resultMono = worker.executeTask(taskWithId).cache();
+        resultMono.subscribe();
+
+        Awaitility.waitAtMost(TEN_SECONDS)
+            .untilAsserted(() -> verify(listener, atLeastOnce()).started(id));
+
+        worker.cancelTask(id);
 
+        resultMono.block(Duration.ofSeconds(10));
+
+        // Due to the use of signals, cancellation cannot be instantaneous
+        // Let a grace period for the cancellation to complete to increase 
test stability
+        Thread.sleep(50);
+
+        verify(listener, atLeastOnce()).cancelled(eq(id), any());
+        verifyNoMoreInteractions(listener);
+    }
+
+    @Test
+    void theWorkerShouldRunAsyncTasksInParallel() throws InterruptedException {
+        TaskId id1 = TaskId.generateTaskId();
+        TaskId id2 = TaskId.generateTaskId();
+        CountDownLatch latch = new CountDownLatch(1);
+        CountDownLatch task1Started = new CountDownLatch(1);
+        CountDownLatch task2Started = new CountDownLatch(1);
+
+        Task inProgressTask1 = new AsyncSafeTask() {
+            @Override
+            public Mono<Result> runAsync() {
+                return Mono.fromCallable(() -> {
+                    task1Started.countDown();
+                    await(latch);
+                    return Task.Result.COMPLETED;
+                });
+            }
+
+            @Override
+            public TaskType type() {
+                return TaskType.of("async memory task");
+            }
+        };
+
+        Task inProgressTask2 = new AsyncSafeTask() {
+            @Override
+            public Mono<Result> runAsync() {
+                return Mono.fromCallable(() -> {
+                    task2Started.countDown();
+                    return Task.Result.COMPLETED;
+                });
+            }
+
+            @Override
+            public TaskType type() {
+                return TaskType.of("async memory task");
+            }
+        };
+
+        worker.executeTask(new TaskWithId(id1, inProgressTask1)).subscribe();
+        await(task1Started);
+
+        worker.executeTask(new TaskWithId(id2, inProgressTask2)).subscribe();
+        await(task2Started);
+
+        verify(listener, atLeastOnce()).started(id1);
+        verify(listener, atLeastOnce()).started(id2);
+        latch.countDown();
+    }
 
     private void await(CountDownLatch countDownLatch) throws 
InterruptedException {
         countDownLatch.await();


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to