This is an automated email from the ASF dual-hosted git repository. kao pushed a commit to branch 3.7.x in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/3.7.x by this push: new 45ee0bc02b JAMES-3890 Allow parallel execution of safe tasks (backport) 45ee0bc02b is described below commit 45ee0bc02b0688d2bf85e749c750991361444bb5 Author: Karsten Otto <karsten.o...@akquinet.de> AuthorDate: Mon Feb 27 08:48:28 2023 +0100 JAMES-3890 Allow parallel execution of safe tasks (backport) --- server/task/task-api/pom.xml | 4 + .../java/org/apache/james/task/AsyncSafeTask.java | 29 +++++++ .../src/main/java/org/apache/james/task/Task.java | 24 +++++- .../apache/james/task/SerialTaskManagerWorker.java | 49 ++++++++---- .../james/task/SerialTaskManagerWorkerTest.java | 89 ++++++++++++++++++++++ 5 files changed, 178 insertions(+), 17 deletions(-) diff --git a/server/task/task-api/pom.xml b/server/task/task-api/pom.xml index 59cd056ade..916325f749 100644 --- a/server/task/task-api/pom.xml +++ b/server/task/task-api/pom.xml @@ -47,6 +47,10 @@ <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> + <dependency> + <groupId>io.projectreactor</groupId> + <artifactId>reactor-core</artifactId> + </dependency> <dependency> <groupId>org.awaitility</groupId> <artifactId>awaitility</artifactId> diff --git a/server/task/task-api/src/main/java/org/apache/james/task/AsyncSafeTask.java b/server/task/task-api/src/main/java/org/apache/james/task/AsyncSafeTask.java new file mode 100644 index 0000000000..504fc35a99 --- /dev/null +++ b/server/task/task-api/src/main/java/org/apache/james/task/AsyncSafeTask.java @@ -0,0 +1,29 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.task; + +/** + * Marker interface for a task that can safely run in parallel with other tasks. + * This means it will not likely interfere with the operations of other tasks, and be able to handle issues arising + * from parallel execution; e.g. if the task lists some messages and then tries to access them, it must gracefully + * handle the situation when they have been deleted in the meantime. + */ +public interface AsyncSafeTask extends Task { +} diff --git a/server/task/task-api/src/main/java/org/apache/james/task/Task.java b/server/task/task-api/src/main/java/org/apache/james/task/Task.java index ac8c2116a9..6e2a377b89 100644 --- a/server/task/task-api/src/main/java/org/apache/james/task/Task.java +++ b/server/task/task-api/src/main/java/org/apache/james/task/Task.java @@ -23,9 +23,13 @@ import java.util.Arrays; import java.util.Optional; import java.util.stream.Stream; +import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + public interface Task { Logger LOGGER = LoggerFactory.getLogger(Task.class); @@ -86,11 +90,25 @@ public interface Task { } /** - * Runs the migration + * Runs the actual task as a blocking operation. + * Must implement either this or #runAsync(). * - * @return Return true if fully migrated. Returns false otherwise. + * @return complete or partial operation result. */ - Result run() throws InterruptedException; + default Result run() throws InterruptedException { + return Mono.from(runAsync()).block(); + } + + /** + * Runs the actual task as an asynchronous operation. + * Must implement either this or #run(). + * + * @return Publisher supplying complete or partial operation result. + */ + default Publisher<Result> runAsync() { + return Mono.fromCallable(this::run) + .subscribeOn(Schedulers.elastic()); + } TaskType type(); diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java index e7b6107d75..208346c19b 100644 --- a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java +++ b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java @@ -22,12 +22,12 @@ import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY; import static org.apache.james.util.ReactorUtils.publishIfPresent; import java.time.Duration; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; import org.apache.james.util.MDCBuilder; @@ -36,6 +36,7 @@ import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import reactor.core.Disposable; @@ -43,16 +44,15 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; -import reactor.util.function.Tuple2; -import reactor.util.function.Tuples; public class SerialTaskManagerWorker implements TaskManagerWorker { private static final Logger LOGGER = LoggerFactory.getLogger(SerialTaskManagerWorker.class); public static final boolean MAY_INTERRUPT_IF_RUNNING = true; private final Scheduler taskExecutor; + private final Scheduler asyncTaskExecutor; private final Listener listener; - private final AtomicReference<Tuple2<TaskId, CompletableFuture>> runningTask; + private final Map<TaskId, CompletableFuture<Task.Result>> runningTasks; private final Set<TaskId> cancelledTasks; private final Duration pollingInterval; @@ -60,33 +60,54 @@ public class SerialTaskManagerWorker implements TaskManagerWorker { this.pollingInterval = pollingInterval; this.taskExecutor = Schedulers.fromExecutor( Executors.newSingleThreadExecutor(NamedThreadFactory.withName("task executor"))); + this.asyncTaskExecutor = Schedulers.fromExecutor( + Executors.newCachedThreadPool(NamedThreadFactory.withName("async task executor"))); this.listener = listener; this.cancelledTasks = Sets.newConcurrentHashSet(); - this.runningTask = new AtomicReference<>(); + this.runningTasks = Maps.newConcurrentMap(); } @Override public Mono<Task.Result> executeTask(TaskWithId taskWithId) { if (!cancelledTasks.remove(taskWithId.getId())) { - Mono<Task.Result> taskMono = Mono.fromCallable(() -> runWithMdc(taskWithId, listener)).subscribeOn(taskExecutor); + Mono<Task.Result> taskMono = Mono.fromCallable(() -> runWithMdc(taskWithId, listener)).subscribeOn(schedulerForTask(taskWithId)); CompletableFuture<Task.Result> future = taskMono.toFuture(); - runningTask.set(Tuples.of(taskWithId.getId(), future)); + runningTasks.put(taskWithId.getId(), future); - return Mono.using( + Mono<Task.Result> pollingMono = Mono.using( () -> pollAdditionalInformation(taskWithId).subscribe(), ignored -> Mono.fromFuture(future) .onErrorResume(exception -> Mono.from(handleExecutionError(taskWithId, listener, exception)) .thenReturn(Task.Result.PARTIAL)), - Disposable::dispose); + Disposable::dispose) + .doOnTerminate(() -> runningTasks.remove(taskWithId.getId())); + + if (taskWithId.getTask() instanceof AsyncSafeTask) { + pollingMono.subscribe(); + return Mono.empty(); + } else { + return pollingMono; + } } else { return Mono.from(listener.cancelled(taskWithId.getId(), taskWithId.getTask().details())) + .doOnTerminate(() -> runningTasks.remove(taskWithId.getId())) .then(Mono.empty()); } } + private Scheduler schedulerForTask(TaskWithId taskWithId) { + if (taskWithId.getTask() instanceof AsyncSafeTask) { + return asyncTaskExecutor; + } else { + return taskExecutor; + } + } + private Publisher<Void> handleExecutionError(TaskWithId taskWithId, Listener listener, Throwable exception) { if (exception instanceof CancellationException) { - return listener.cancelled(taskWithId.getId(), taskWithId.getTask().details()); + return Mono.from(listener.cancelled(taskWithId.getId(), taskWithId.getTask().details())) + .then(Mono.fromCallable(() -> cancelledTasks.remove(taskWithId.getId()))) + .then(); } else { return listener.failed(taskWithId.getId(), taskWithId.getTask().details(), exception); } @@ -137,7 +158,7 @@ public class SerialTaskManagerWorker implements TaskManagerWorker { } private Mono<Task.Result> runTask(TaskWithId taskWithId, Listener listener) { - return Mono.fromCallable(() -> taskWithId.getTask().run()) + return Mono.from(taskWithId.getTask().runAsync()) .doOnNext(result -> result .onComplete(any -> Mono.from(listener.completed(taskWithId.getId(), result, taskWithId.getTask().details())).block()) .onFailure(() -> { @@ -149,9 +170,8 @@ public class SerialTaskManagerWorker implements TaskManagerWorker { @Override public void cancelTask(TaskId taskId) { cancelledTasks.add(taskId); - Optional.ofNullable(runningTask.get()) - .filter(task -> task.getT1().equals(taskId)) - .ifPresent(task -> task.getT2().cancel(MAY_INTERRUPT_IF_RUNNING)); + Optional.ofNullable(runningTasks.get(taskId)) + .ifPresent(task -> task.cancel(MAY_INTERRUPT_IF_RUNNING)); } @Override @@ -162,5 +182,6 @@ public class SerialTaskManagerWorker implements TaskManagerWorker { @Override public void close() { taskExecutor.dispose(); + asyncTaskExecutor.dispose(); } } diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java index 77c2be8803..7025ca6b72 100644 --- a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java +++ b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java @@ -233,7 +233,96 @@ class SerialTaskManagerWorkerTest { verify(listener, atLeastOnce()).cancelled(id, Optional.empty()); verifyNoMoreInteractions(listener); } + + @Test + void theWorkerShouldCancelAnInProgressAsyncTask() throws InterruptedException { + TaskId id = TaskId.generateTaskId(); + CountDownLatch latch = new CountDownLatch(1); + + Task inProgressTask = new AsyncSafeTask() { + @Override + public Mono<Result> runAsync() { + return Mono.fromCallable(() -> { + await(latch); + return Task.Result.COMPLETED; + }); + } + + @Override + public TaskType type() { + return TaskType.of("async memory task"); + } + }; + + TaskWithId taskWithId = new TaskWithId(id, inProgressTask); + + Mono<Task.Result> resultMono = worker.executeTask(taskWithId).cache(); + resultMono.subscribe(); + + Awaitility.waitAtMost(TEN_SECONDS) + .untilAsserted(() -> verify(listener, atLeastOnce()).started(id)); + + worker.cancelTask(id); + resultMono.block(Duration.ofSeconds(10)); + + // Due to the use of signals, cancellation cannot be instantaneous + // Let a grace period for the cancellation to complete to increase test stability + Thread.sleep(50); + + verify(listener, atLeastOnce()).cancelled(eq(id), any()); + verifyNoMoreInteractions(listener); + } + + @Test + void theWorkerShouldRunAsyncTasksInParallel() throws InterruptedException { + TaskId id1 = TaskId.generateTaskId(); + TaskId id2 = TaskId.generateTaskId(); + CountDownLatch latch = new CountDownLatch(1); + CountDownLatch task1Started = new CountDownLatch(1); + CountDownLatch task2Started = new CountDownLatch(1); + + Task inProgressTask1 = new AsyncSafeTask() { + @Override + public Mono<Result> runAsync() { + return Mono.fromCallable(() -> { + task1Started.countDown(); + await(latch); + return Task.Result.COMPLETED; + }); + } + + @Override + public TaskType type() { + return TaskType.of("async memory task"); + } + }; + + Task inProgressTask2 = new AsyncSafeTask() { + @Override + public Mono<Result> runAsync() { + return Mono.fromCallable(() -> { + task2Started.countDown(); + return Task.Result.COMPLETED; + }); + } + + @Override + public TaskType type() { + return TaskType.of("async memory task"); + } + }; + + worker.executeTask(new TaskWithId(id1, inProgressTask1)).subscribe(); + await(task1Started); + + worker.executeTask(new TaskWithId(id2, inProgressTask2)).subscribe(); + await(task2Started); + + verify(listener, atLeastOnce()).started(id1); + verify(listener, atLeastOnce()).started(id2); + latch.countDown(); + } private void await(CountDownLatch countDownLatch) throws InterruptedException { countDownLatch.await(); --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org