This is an automated email from the ASF dual-hosted git repository. matthieu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push: new 8e3eb60 JAMES-2272 enforce responsibilities segregation for Worker/Manager/WorkQueue 8e3eb60 is described below commit 8e3eb6098fe05a3da582c98fca67f62a90bac16f Author: Matthieu Baechler <matth...@apache.org> AuthorDate: Tue Jul 2 11:33:16 2019 +0200 JAMES-2272 enforce responsibilities segregation for Worker/Manager/WorkQueue The WorkQueue is only reponsible for submitting and canceling tasks The Worker manage the run of a task --- .../org/apache/james/task/MemoryTaskManager.java | 61 ++------- .../apache/james/task/MemoryTaskManagerWorker.java | 107 --------------- .../apache/james/task/SerialTaskManagerWorker.java | 144 +++++++++++++++++++++ .../src/main/java/org/apache/james/task/Task.java | 28 +++- .../apache/james/task/TaskExecutionDetails.java | 130 +++++++++++-------- .../org/apache/james/task/TaskManagerWorker.java | 9 +- .../main/java/org/apache/james/task/WorkQueue.java | 91 +++++-------- ...rTest.java => SerialTaskManagerWorkerTest.java} | 46 ++++--- .../org/apache/james/task/TaskManagerContract.java | 89 ++++++------- 9 files changed, 350 insertions(+), 355 deletions(-) diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java index 39d7c35..6b4574f 100644 --- a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java +++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java @@ -24,9 +24,7 @@ import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; -import java.util.function.BiConsumer; import java.util.function.Consumer; import javax.annotation.PreDestroy; @@ -34,7 +32,6 @@ import javax.annotation.PreDestroy; import com.github.steveash.guavate.Guavate; import com.google.common.collect.ImmutableList; import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; public class MemoryTaskManager implements TaskManager { @@ -53,8 +50,9 @@ public class MemoryTaskManager implements TaskManager { } @Override - public void completed() { + public void completed(Task.Result result) { updater.accept(TaskExecutionDetails::completed); + } @Override @@ -78,47 +76,19 @@ public class MemoryTaskManager implements TaskManager { private final WorkQueue workQueue; private final TaskManagerWorker worker; private final ConcurrentHashMap<TaskId, TaskExecutionDetails> idToExecutionDetails; - private final ConcurrentHashMap<TaskId, Mono<Task.Result>> tasksResult; public MemoryTaskManager() { idToExecutionDetails = new ConcurrentHashMap<>(); - tasksResult = new ConcurrentHashMap<>(); - worker = new MemoryTaskManagerWorker(); - workQueue = WorkQueue.builder() - .worker(this::treatTask) - .listener(this::listenToWorkQueueEvents); - } - - private void listenToWorkQueueEvents(WorkQueue.Event event) { - switch (event.status) { - case CANCELLED: - updateDetails(event.id).accept(TaskExecutionDetails::cancelEffectively); - break; - case STARTED: - break; - } - } - - private void treatTask(TaskWithId task) { - DetailsUpdater detailsUpdater = updater(task.getId()); - Mono<Task.Result> result = worker.executeTask(task, detailsUpdater); - tasksResult.put(task.getId(), result); - try { - BiConsumer<Throwable, Object> ignoreException = (t, o) -> { }; - result - .onErrorContinue(InterruptedException.class, ignoreException) - .block(); - } catch (CancellationException e) { - // Do not throw CancellationException - } + worker = new SerialTaskManagerWorker(); + workQueue = WorkQueue.builder().worker(worker); } public TaskId submit(Task task) { TaskId taskId = TaskId.generateTaskId(); TaskExecutionDetails executionDetails = TaskExecutionDetails.from(task, taskId); idToExecutionDetails.put(taskId, executionDetails); - workQueue.submit(new TaskWithId(taskId, task)); + workQueue.submit(new TaskWithId(taskId, task), updater(taskId)); return taskId; } @@ -148,11 +118,8 @@ public class MemoryTaskManager implements TaskManager { @Override public void cancel(TaskId id) { Optional.ofNullable(idToExecutionDetails.get(id)).ifPresent(details -> { - if (details.getStatus().equals(Status.WAITING)) { - updateDetails(id).accept(TaskExecutionDetails::cancelRequested); - } + updateDetails(id).accept(TaskExecutionDetails::cancelRequested); workQueue.cancel(id); - worker.cancelTask(id, updater(id)); } ); } @@ -161,18 +128,10 @@ public class MemoryTaskManager implements TaskManager { public TaskExecutionDetails await(TaskId id) { if (Optional.ofNullable(idToExecutionDetails.get(id)).isPresent()) { return Flux.interval(NOW, AWAIT_POLLING_DURATION, Schedulers.elastic()) - .filter(ignore -> tasksResult.get(id) != null) - .map(ignore -> { - Optional.ofNullable(tasksResult.get(id)) - .ifPresent(mono -> { - try { - mono.block(); - } catch (CancellationException e) { - // ignore - } - }); - return getExecutionDetails(id); - }) + .map(ignored -> getExecutionDetails(id)) + .filter(details -> details.getStatus() == Status.COMPLETED + || details.getStatus() == Status.FAILED + || details.getStatus() == Status.CANCELLED) .take(1) .blockFirst(); } else { diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java deleted file mode 100644 index 4dda777..0000000 --- a/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java +++ /dev/null @@ -1,107 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ -package org.apache.james.task; - -import java.time.Duration; -import java.util.Optional; -import java.util.concurrent.CancellationException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.james.util.FunctionalUtils; -import org.apache.james.util.MDCBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -public class MemoryTaskManagerWorker implements TaskManagerWorker { - - private static final boolean INTERRUPT_IF_RUNNING = true; - private static final Logger LOGGER = LoggerFactory.getLogger(MemoryTaskManagerWorker.class); - private static final Duration CHECK_CANCELED_PERIOD = Duration.ofMillis(100); - private static final int FIRST = 1; - - private final ConcurrentHashMap<TaskId, CompletableFuture<Task.Result>> idToFuture = new ConcurrentHashMap<>(); - - @Override - public Mono<Task.Result> executeTask(TaskWithId taskWithId, Listener listener) { - CompletableFuture<Task.Result> futureResult = CompletableFuture.supplyAsync(() -> runWithMdc(taskWithId, listener)); - - idToFuture.put(taskWithId.getId(), futureResult); - - return Mono.fromFuture(futureResult) - .doOnError(res -> { - if (!(res instanceof CancellationException)) { - listener.failed(res); - LOGGER.error("Task was partially performed. Check logs for more details", res); - } - }) - .doOnTerminate(() -> idToFuture.remove(taskWithId.getId())); - } - - private Task.Result runWithMdc(TaskWithId taskWithId, Listener listener) { - return MDCBuilder.withMdc( - MDCBuilder.create() - .addContext(Task.TASK_ID, taskWithId.getId()) - .addContext(Task.TASK_TYPE, taskWithId.getTask().type()) - .addContext(Task.TASK_DETAILS, taskWithId.getTask().details()), - () -> run(taskWithId, listener)); - } - - - private Task.Result run(TaskWithId taskWithId, Listener listener) { - listener.started(); - try { - return taskWithId.getTask() - .run() - .onComplete(listener::completed) - .onFailure(() -> { - LOGGER.error("Task was partially performed. Check logs for more details. Taskid : " + taskWithId.getId()); - listener.failed(); - }); - } catch (Exception e) { - LOGGER.error("Error while running task {}", taskWithId.getId(), e); - listener.failed(e); - return Task.Result.PARTIAL; - } - } - - @Override - public void cancelTask(TaskId id, Listener listener) { - Optional.ofNullable(idToFuture.remove(id)) - .ifPresent(future -> { - requestCancellation(future); - waitUntilFutureIsCancelled(future).blockFirst(); - listener.cancelled(); - }); - } - - private void requestCancellation(CompletableFuture<Task.Result> future) { - future.cancel(INTERRUPT_IF_RUNNING); - } - - private Flux<Boolean> waitUntilFutureIsCancelled(CompletableFuture<Task.Result> future) { - return Flux.interval(CHECK_CANCELED_PERIOD) - .map(ignore -> future.isCancelled()) - .filter(FunctionalUtils.identityPredicate()) - .take(FIRST); - } -} diff --git a/server/task/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java b/server/task/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java new file mode 100644 index 0000000..d690be2 --- /dev/null +++ b/server/task/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java @@ -0,0 +1,144 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.task; + +import java.io.IOException; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + +import org.apache.james.util.MDCBuilder; +import org.apache.james.util.concurrent.NamedThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Sets; +import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; + +public class SerialTaskManagerWorker implements TaskManagerWorker { + + private static final Logger LOGGER = LoggerFactory.getLogger(SerialTaskManagerWorker.class); + private final ExecutorService taskExecutor; + private final AtomicReference<Tuple2<TaskId, Future<?>>> runningTask; + private final Semaphore semaphore; + private final Set<TaskId> cancelledTasks; + + SerialTaskManagerWorker() { + this.taskExecutor = Executors.newSingleThreadExecutor(NamedThreadFactory.withName("task executor")); + this.cancelledTasks = Sets.newConcurrentHashSet(); + this.runningTask = new AtomicReference<>(); + this.semaphore = new Semaphore(1); + } + + @Override + public Mono<Task.Result> executeTask(TaskWithId taskWithId, Listener listener) { + return Mono + .using( + acquireSemaphore(taskWithId, listener), + executeWithSemaphore(taskWithId, listener), + Semaphore::release); + + } + + private Callable<Semaphore> acquireSemaphore(TaskWithId taskWithId, Listener listener) { + return () -> { + try { + semaphore.acquire(); + return semaphore; + } catch (InterruptedException e) { + listener.cancelled(); + throw e; + } + }; + } + + private Function<Semaphore, Mono<Task.Result>> executeWithSemaphore(TaskWithId taskWithId, Listener listener) { + return semaphore -> { + if (!cancelledTasks.remove(taskWithId.getId())) { + CompletableFuture<Task.Result> future = CompletableFuture.supplyAsync(() -> runWithMdc(taskWithId, listener), taskExecutor); + runningTask.set(Tuples.of(taskWithId.getId(), future)); + + return Mono.fromFuture(future) + .doOnError(exception -> { + if (exception instanceof CancellationException) { + listener.cancelled(); + } else { + listener.failed(exception); + } + }) + .onErrorReturn(Task.Result.PARTIAL); + } else { + listener.cancelled(); + return Mono.empty(); + } + }; + } + + private Task.Result runWithMdc(TaskWithId taskWithId, Listener listener) { + return MDCBuilder.withMdc( + MDCBuilder.create() + .addContext(Task.TASK_ID, taskWithId.getId()) + .addContext(Task.TASK_TYPE, taskWithId.getTask().type()) + .addContext(Task.TASK_DETAILS, taskWithId.getTask().details()), + () -> run(taskWithId, listener)); + } + + private Task.Result run(TaskWithId taskWithId, Listener listener) { + listener.started(); + try { + return taskWithId.getTask() + .run() + .onComplete(listener::completed) + .onFailure(() -> { + LOGGER.error("Task was partially performed. Check logs for more details. Taskid : " + taskWithId.getId()); + listener.failed(); + }); + } catch (InterruptedException e) { + listener.cancelled(); + return Task.Result.PARTIAL; + } catch (Exception e) { + LOGGER.error("Error while running task {}", taskWithId.getId(), e); + listener.failed(e); + return Task.Result.PARTIAL; + } + } + + @Override + public void cancelTask(TaskId taskId) { + cancelledTasks.add(taskId); + Optional.ofNullable(runningTask.get()) + .filter(task -> task.getT1().equals(taskId)) + .ifPresent(task -> task.getT2().cancel(true)); + } + + @Override + public void close() throws IOException { + taskExecutor.shutdownNow(); + } +} diff --git a/server/task/src/main/java/org/apache/james/task/Task.java b/server/task/src/main/java/org/apache/james/task/Task.java index 0c79ec0..2edb39d 100644 --- a/server/task/src/main/java/org/apache/james/task/Task.java +++ b/server/task/src/main/java/org/apache/james/task/Task.java @@ -21,6 +21,7 @@ package org.apache.james.task; import java.util.Arrays; import java.util.Optional; +import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,11 +33,23 @@ public interface Task { void run(); } + interface CompletionOperation { + void complete(Result result); + } + enum Result { COMPLETED, PARTIAL; - public Result onComplete(Operation... operation) { + public Result onComplete(Operation... operations) { + return onComplete(Arrays.stream(operations).map(this::wrap)); + } + + public Result onComplete(CompletionOperation... operation) { + return onComplete(Arrays.stream(operation)); + } + + private Result onComplete(Stream<CompletionOperation> operation) { try { if (this == COMPLETED) { run(operation); @@ -48,16 +61,19 @@ public interface Task { } } - public Result onFailure(Operation... operation) { + public Result onFailure(Operation... operations) { if (this == PARTIAL) { - run(operation); + run(Arrays.stream(operations).map(this::wrap)); } return this; } - private void run(Operation... operation) { - Arrays.stream(operation) - .forEach(Operation::run); + private CompletionOperation wrap(Operation o) { + return result -> o.run(); + } + + private void run(Stream<CompletionOperation> operations) { + operations.forEach(operation -> operation.complete(this)); } } diff --git a/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java b/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java index b34ac60..a75b9e7 100644 --- a/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java +++ b/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java @@ -22,8 +22,6 @@ package org.apache.james.task; import java.time.ZonedDateTime; import java.util.Optional; -import com.google.common.base.Preconditions; - public class TaskExecutionDetails { public interface AdditionalInformation { @@ -102,69 +100,93 @@ public class TaskExecutionDetails { } public TaskExecutionDetails start() { - Preconditions.checkState(status == TaskManager.Status.WAITING, "expected WAITING actual status is " + status); - return new TaskExecutionDetails( - taskId, - task, - TaskManager.Status.IN_PROGRESS, - submitDate, - Optional.of(ZonedDateTime.now()), - Optional.empty(), - Optional.empty(), - Optional.empty()); + switch (status) { + case WAITING: + return new TaskExecutionDetails( + taskId, + task, + TaskManager.Status.IN_PROGRESS, + submitDate, + Optional.of(ZonedDateTime.now()), + Optional.empty(), + Optional.empty(), + Optional.empty()); + default: + return this; + } } public TaskExecutionDetails completed() { - Preconditions.checkState(status == TaskManager.Status.IN_PROGRESS); - return new TaskExecutionDetails( - taskId, - task, - TaskManager.Status.COMPLETED, - submitDate, - startedDate, - Optional.of(ZonedDateTime.now()), - Optional.empty(), - Optional.empty()); + switch (status) { + case IN_PROGRESS: + case CANCEL_REQUESTED: + case WAITING: + return new TaskExecutionDetails( + taskId, + task, + TaskManager.Status.COMPLETED, + submitDate, + startedDate, + Optional.of(ZonedDateTime.now()), + Optional.empty(), + Optional.empty()); + default: + return this; + } } public TaskExecutionDetails failed() { - Preconditions.checkState(status == TaskManager.Status.IN_PROGRESS); - return new TaskExecutionDetails( - taskId, - task, - TaskManager.Status.FAILED, - submitDate, - startedDate, - Optional.empty(), - Optional.empty(), - Optional.of(ZonedDateTime.now())); + switch (status) { + case IN_PROGRESS: + case CANCEL_REQUESTED: + return new TaskExecutionDetails( + taskId, + task, + TaskManager.Status.FAILED, + submitDate, + startedDate, + Optional.empty(), + Optional.empty(), + Optional.of(ZonedDateTime.now())); + default: + return this; + } } public TaskExecutionDetails cancelRequested() { - Preconditions.checkState(status == TaskManager.Status.IN_PROGRESS - || status == TaskManager.Status.WAITING); - return new TaskExecutionDetails( - taskId, - task, - TaskManager.Status.CANCEL_REQUESTED, - submitDate, - startedDate, - Optional.empty(), - Optional.of(ZonedDateTime.now()), - Optional.empty()); + switch (status) { + case IN_PROGRESS: + case WAITING: + return new TaskExecutionDetails( + taskId, + task, + TaskManager.Status.CANCEL_REQUESTED, + submitDate, + startedDate, + Optional.empty(), + Optional.of(ZonedDateTime.now()), + Optional.empty()); + default: + return this; + } } public TaskExecutionDetails cancelEffectively() { - Preconditions.checkState(status == TaskManager.Status.IN_PROGRESS - || status == TaskManager.Status.WAITING || status == TaskManager.Status.CANCEL_REQUESTED); - return new TaskExecutionDetails( - taskId, - task, - TaskManager.Status.CANCELLED, - submitDate, - startedDate, - Optional.empty(), - Optional.of(ZonedDateTime.now()), - Optional.empty()); + switch (status) { + case CANCEL_REQUESTED: + case IN_PROGRESS: + case WAITING: + return new TaskExecutionDetails( + taskId, + task, + TaskManager.Status.CANCELLED, + submitDate, + startedDate, + Optional.empty(), + Optional.of(ZonedDateTime.now()), + Optional.empty()); + default: + return this; + } } } diff --git a/server/task/src/main/java/org/apache/james/task/TaskManagerWorker.java b/server/task/src/main/java/org/apache/james/task/TaskManagerWorker.java index 8a806e6..cacc6c8 100644 --- a/server/task/src/main/java/org/apache/james/task/TaskManagerWorker.java +++ b/server/task/src/main/java/org/apache/james/task/TaskManagerWorker.java @@ -18,14 +18,16 @@ ****************************************************************/ package org.apache.james.task; +import java.io.Closeable; + import reactor.core.publisher.Mono; -public interface TaskManagerWorker { +public interface TaskManagerWorker extends Closeable { interface Listener { void started(); - void completed(); + void completed(Task.Result result); void failed(Throwable t); @@ -36,6 +38,5 @@ public interface TaskManagerWorker { Mono<Task.Result> executeTask(TaskWithId taskWithId, Listener listener); - void cancelTask(TaskId id, Listener listener); - + void cancelTask(TaskId taskId); } diff --git a/server/task/src/main/java/org/apache/james/task/WorkQueue.java b/server/task/src/main/java/org/apache/james/task/WorkQueue.java index 3484d2a..e2e7035 100644 --- a/server/task/src/main/java/org/apache/james/task/WorkQueue.java +++ b/server/task/src/main/java/org/apache/james/task/WorkQueue.java @@ -21,81 +21,54 @@ package org.apache.james.task; import java.io.Closeable; import java.io.IOException; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.function.Consumer; +import java.util.concurrent.LinkedBlockingQueue; -import com.google.common.collect.Sets; import reactor.core.Disposable; -import reactor.core.publisher.WorkQueueProcessor; +import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; public class WorkQueue implements Closeable { - public enum Status { - STARTED, - CANCELLED - } - - public static class Event { - public final TaskId id; - public final Status status; - - private Event(TaskId id, Status status) { - this.id = id; - this.status = status; - } - } - public static RequireWorker builder() { - return worker -> listener -> new WorkQueue(worker, listener); + return WorkQueue::new; } public interface RequireWorker { - RequireListener worker(Consumer<TaskWithId> worker); - } - - public interface RequireListener { - WorkQueue listener(Consumer<Event> worker); + WorkQueue worker(TaskManagerWorker worker); } - private final WorkQueueProcessor<TaskWithId> workQueue; - private final ExecutorService taskExecutor = Executors.newSingleThreadExecutor(); - private final ExecutorService requestTaskExecutor = Executors.newSingleThreadExecutor(); - private final Set<TaskId> cancelledTasks; - private final Consumer<Event> listener; + private final TaskManagerWorker worker; private final Disposable subscription; - - private WorkQueue(Consumer<TaskWithId> worker, Consumer<Event> listener) { - this.listener = listener; - cancelledTasks = Sets.newConcurrentHashSet(); - workQueue = WorkQueueProcessor.<TaskWithId>builder() - .executor(taskExecutor) - .requestTaskExecutor(requestTaskExecutor) - .build(); - subscription = workQueue - .subscribeOn(Schedulers.single()) - .subscribe(dispatchNonCancelledTaskToWorker(worker)); + private final LinkedBlockingQueue<Tuple2<TaskWithId, TaskManagerWorker.Listener>> tasks; + + private WorkQueue(TaskManagerWorker worker) { + this.worker = worker; + this.tasks = new LinkedBlockingQueue<>(); + this.subscription = Mono.fromCallable(tasks::take) + .repeat() + .subscribeOn(Schedulers.elastic()) + .flatMapSequential(this::dispatchTaskToWorker) + .subscribe(); } - private Consumer<TaskWithId> dispatchNonCancelledTaskToWorker(Consumer<TaskWithId> delegate) { - return taskWithId -> { - if (!cancelledTasks.remove(taskWithId.getId())) { - listener.accept(new Event(taskWithId.getId(), Status.STARTED)); - delegate.accept(taskWithId); - } else { - listener.accept(new Event(taskWithId.getId(), Status.CANCELLED)); - } - }; + private Mono<?> dispatchTaskToWorker(Tuple2<TaskWithId, TaskManagerWorker.Listener> tuple) { + TaskWithId taskWithId = tuple.getT1(); + TaskManagerWorker.Listener listener = tuple.getT2(); + return worker.executeTask(taskWithId, listener); } - public void submit(TaskWithId taskWithId) { - workQueue.onNext(taskWithId); + public void submit(TaskWithId taskWithId, TaskManagerWorker.Listener listener) { + try { + tasks.put(Tuples.of(taskWithId, listener)); + } catch (InterruptedException e) { + listener.cancelled(); + } } public void cancel(TaskId taskId) { - cancelledTasks.add(taskId); + worker.cancelTask(taskId); } @Override @@ -105,13 +78,7 @@ public class WorkQueue implements Closeable { } catch (Throwable ignore) { //avoid failing during close } - try { - workQueue.dispose(); - } catch (Throwable ignore) { - //avoid failing during close - } - taskExecutor.shutdownNow(); - requestTaskExecutor.shutdownNow(); + worker.close(); } } diff --git a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerWorkerTest.java b/server/task/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java similarity index 79% rename from server/task/src/test/java/org/apache/james/task/MemoryTaskManagerWorkerTest.java rename to server/task/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java index 5d8a13f..77aaf88 100644 --- a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerWorkerTest.java +++ b/server/task/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java @@ -25,16 +25,20 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; +import java.io.IOException; +import java.time.Duration; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import reactor.core.publisher.Mono; -class MemoryTaskManagerWorkerTest { +class SerialTaskManagerWorkerTest { - private final MemoryTaskManagerWorker worker = new MemoryTaskManagerWorker(); + private final SerialTaskManagerWorker worker = new SerialTaskManagerWorker(); private final Task successfulTask = () -> Task.Result.COMPLETED; private final Task failedTask = () -> Task.Result.PARTIAL; @@ -42,17 +46,22 @@ class MemoryTaskManagerWorkerTest { throw new RuntimeException("Throwing Task"); }; + @AfterEach + void tearDown() throws IOException { + worker.close(); + } + @Test void aSuccessfullTaskShouldCompleteSuccessfully() { TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), this.successfulTask); TaskManagerWorker.Listener listener = mock(TaskManagerWorker.Listener.class); - Mono<Task.Result> result = worker.executeTask(taskWithId, listener).cache(); + Mono<Task.Result> result = worker.executeTask(taskWithId, listener); assertThat(result.block()).isEqualTo(Task.Result.COMPLETED); - verify(listener, atLeastOnce()).completed(); + verify(listener, atLeastOnce()).completed(Task.Result.COMPLETED); } @Test @@ -60,7 +69,7 @@ class MemoryTaskManagerWorkerTest { TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), failedTask); TaskManagerWorker.Listener listener = mock(TaskManagerWorker.Listener.class); - Mono<Task.Result> result = worker.executeTask(taskWithId, listener).cache(); + Mono<Task.Result> result = worker.executeTask(taskWithId, listener); assertThat(result.block()).isEqualTo(Task.Result.PARTIAL); verify(listener, atLeastOnce()).failed(); @@ -71,14 +80,14 @@ class MemoryTaskManagerWorkerTest { TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), throwingTask); TaskManagerWorker.Listener listener = mock(TaskManagerWorker.Listener.class); - Mono<Task.Result> result = worker.executeTask(taskWithId, listener).cache(); + Mono<Task.Result> result = worker.executeTask(taskWithId, listener); assertThat(result.block()).isEqualTo(Task.Result.PARTIAL); verify(listener, atLeastOnce()).failed(any(RuntimeException.class)); } @Test - void theWorkerShouldReportThatATaskIsInProgress() { + void theWorkerShouldReportThatATaskIsInProgress() throws InterruptedException { TaskId id = TaskId.generateTaskId(); CountDownLatch latch = new CountDownLatch(1); CountDownLatch taskLaunched = new CountDownLatch(1); @@ -93,7 +102,7 @@ class MemoryTaskManagerWorkerTest { TaskManagerWorker.Listener listener = mock(TaskManagerWorker.Listener.class); - worker.executeTask(taskWithId, listener).cache(); + worker.executeTask(taskWithId, listener).subscribe(); await(taskLaunched); verify(listener, atLeastOnce()).started(); @@ -102,7 +111,7 @@ class MemoryTaskManagerWorkerTest { } @Test - void theWorkerShouldCancelAnInProgressTask() { + void theWorkerShouldCancelAnInProgressTask() throws InterruptedException { TaskId id = TaskId.generateTaskId(); AtomicInteger counter = new AtomicInteger(0); CountDownLatch latch = new CountDownLatch(1); @@ -117,21 +126,22 @@ class MemoryTaskManagerWorkerTest { TaskManagerWorker.Listener listener = mock(TaskManagerWorker.Listener.class); - worker.executeTask(taskWithId, listener).cache(); + Mono<Task.Result> resultMono = worker.executeTask(taskWithId, listener).cache(); + resultMono.subscribe(); - worker.cancelTask(id, listener); + Awaitility.waitAtMost(org.awaitility.Duration.TEN_SECONDS) + .untilAsserted(() -> verify(listener, atLeastOnce()).started()); + + worker.cancelTask(id); + + resultMono.block(Duration.ofSeconds(10)); - verify(listener, atLeastOnce()).started(); verify(listener, atLeastOnce()).cancelled(); verifyNoMoreInteractions(listener); } - private void await(CountDownLatch countDownLatch) { - try { - countDownLatch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + private void await(CountDownLatch countDownLatch) throws InterruptedException { + countDownLatch.await(); } } diff --git a/server/task/src/test/java/org/apache/james/task/TaskManagerContract.java b/server/task/src/test/java/org/apache/james/task/TaskManagerContract.java index af83a76..9c93da4 100644 --- a/server/task/src/test/java/org/apache/james/task/TaskManagerContract.java +++ b/server/task/src/test/java/org/apache/james/task/TaskManagerContract.java @@ -3,8 +3,8 @@ package org.apache.james.task; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.awaitility.Duration.FIVE_SECONDS; import static org.awaitility.Duration.ONE_HUNDRED_MILLISECONDS; -import static org.awaitility.Duration.ONE_SECOND; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; @@ -15,6 +15,7 @@ import org.assertj.core.api.SoftAssertions; import org.awaitility.Awaitility; import org.awaitility.Duration; import org.awaitility.core.ConditionFactory; +import org.hamcrest.Matchers; import org.junit.jupiter.api.Test; public interface TaskManagerContract { @@ -26,7 +27,7 @@ public interface TaskManagerContract { .with() .pollDelay(slowPacedPollInterval) .await(); - ConditionFactory awaitAtMostOneSecond = calmlyAwait.atMost(ONE_SECOND); + ConditionFactory awaitAtMostFiveSeconds = calmlyAwait.atMost(FIVE_SECONDS); TaskManager taskManager(); @@ -41,7 +42,7 @@ public interface TaskManagerContract { default void getStatusShouldReturnWaitingWhenNotYetProcessed(CountDownLatch waitingForResultLatch) { TaskManager taskManager = taskManager(); taskManager.submit(() -> { - waitForResult(waitingForResultLatch); + waitingForResultLatch.await(); return Task.Result.COMPLETED; }); @@ -52,21 +53,21 @@ public interface TaskManagerContract { } @Test - default void taskCodeAfterCancelIsNotRun(CountDownLatch waitingForResultLatch) { + default void taskCodeAfterCancelIsNotRun(CountDownLatch waitingForResultLatch) throws InterruptedException { TaskManager taskManager = taskManager(); CountDownLatch waitForTaskToBeLaunched = new CountDownLatch(1); AtomicInteger count = new AtomicInteger(0); TaskId id = taskManager.submit(() -> { waitForTaskToBeLaunched.countDown(); - waitForResult(waitingForResultLatch); + waitingForResultLatch.await(); //We sleep to handover the CPU to the scheduler Thread.sleep(1); count.incrementAndGet(); return Task.Result.COMPLETED; }); - await(waitForTaskToBeLaunched); + waitForTaskToBeLaunched.await(); taskManager.cancel(id); assertThat(count.get()).isEqualTo(0); @@ -105,10 +106,10 @@ public interface TaskManagerContract { } @Test - default void getStatusShouldBeCancelledWhenCancelled() { + default void getStatusShouldBeCancelledWhenCancelled(CountDownLatch countDownLatch) { TaskManager taskManager = taskManager(); TaskId id = taskManager.submit(() -> { - sleep(500); + countDownLatch.await(); return Task.Result.COMPLETED; }); @@ -118,6 +119,8 @@ public interface TaskManagerContract { assertThat(taskManager.getExecutionDetails(id).getStatus()) .isIn(TaskManager.Status.CANCELLED, TaskManager.Status.CANCEL_REQUESTED); + countDownLatch.countDown(); + awaitUntilTaskHasStatus(id, TaskManager.Status.CANCELLED, taskManager); assertThat(taskManager.getExecutionDetails(id).getStatus()) .isEqualTo(TaskManager.Status.CANCELLED); @@ -125,10 +128,10 @@ public interface TaskManagerContract { } @Test - default void aWaitingTaskShouldBeCancelled() { + default void aWaitingTaskShouldBeCancelled(CountDownLatch countDownLatch) { TaskManager taskManager = taskManager(); TaskId id = taskManager.submit(() -> { - sleep(500); + countDownLatch.await(); return Task.Result.COMPLETED; }); @@ -138,21 +141,21 @@ public interface TaskManagerContract { awaitUntilTaskHasStatus(id, TaskManager.Status.IN_PROGRESS, taskManager); - assertThat(taskManager.getExecutionDetails(idTaskToCancel).getStatus()) .isIn(TaskManager.Status.CANCELLED, TaskManager.Status.CANCEL_REQUESTED); + countDownLatch.countDown(); + awaitUntilTaskHasStatus(idTaskToCancel, TaskManager.Status.CANCELLED, taskManager); assertThat(taskManager.getExecutionDetails(idTaskToCancel).getStatus()) .isEqualTo(TaskManager.Status.CANCELLED); - } @Test default void cancelShouldBeIdempotent(CountDownLatch waitingForResultLatch) { TaskManager taskManager = taskManager(); TaskId id = taskManager.submit(() -> { - waitForResult(waitingForResultLatch); + waitingForResultLatch.await(); return Task.Result.COMPLETED; }); awaitUntilTaskHasStatus(id, TaskManager.Status.IN_PROGRESS, taskManager); @@ -165,7 +168,7 @@ public interface TaskManagerContract { default void getStatusShouldReturnInProgressWhenProcessingIsInProgress(CountDownLatch waitingForResultLatch) { TaskManager taskManager = taskManager(); TaskId taskId = taskManager.submit(() -> { - waitForResult(waitingForResultLatch); + waitingForResultLatch.await(); return Task.Result.COMPLETED; }); awaitUntilTaskHasStatus(taskId, TaskManager.Status.IN_PROGRESS, taskManager); @@ -210,7 +213,7 @@ public interface TaskManagerContract { TaskId inProgressId = taskManager.submit( () -> { latch1.countDown(); - waitForResult(waitingForResultLatch); + waitingForResultLatch.await(); return Task.Result.COMPLETED; }); TaskId waitingId = taskManager.submit( @@ -250,14 +253,14 @@ public interface TaskManagerContract { () -> Task.Result.COMPLETED); taskManager.submit( () -> { - await(latch1); + latch1.await(); latch2.countDown(); - waitForResult(waitingForResultLatch); + waitingForResultLatch.await(); return Task.Result.COMPLETED; }); TaskId waitingId = taskManager.submit( () -> { - waitForResult(waitingForResultLatch); + waitingForResultLatch.await(); latch2.countDown(); return Task.Result.COMPLETED; }); @@ -282,14 +285,14 @@ public interface TaskManagerContract { () -> Task.Result.COMPLETED); taskManager.submit( () -> { - await(latch1); + latch1.await(); latch2.countDown(); - waitForResult(waitingForResultLatch); + waitingForResultLatch.await(); return Task.Result.COMPLETED; }); taskManager.submit( () -> { - waitForResult(waitingForResultLatch); + waitingForResultLatch.await(); latch2.countDown(); return Task.Result.COMPLETED; }); @@ -314,14 +317,14 @@ public interface TaskManagerContract { () -> Task.Result.COMPLETED); taskManager.submit( () -> { - await(latch1); + latch1.await(); latch2.countDown(); - waitForResult(waitingForResultLatch); + waitingForResultLatch.await(); return Task.Result.COMPLETED; }); taskManager.submit( () -> { - waitForResult(waitingForResultLatch); + waitingForResultLatch.await(); latch2.countDown(); return Task.Result.COMPLETED; }); @@ -346,14 +349,14 @@ public interface TaskManagerContract { () -> Task.Result.COMPLETED); TaskId inProgressId = taskManager.submit( () -> { - await(latch1); + latch1.await(); latch2.countDown(); - waitForResult(waitingForResultLatch); + waitingForResultLatch.await(); return Task.Result.COMPLETED; }); taskManager.submit( () -> { - waitForResult(waitingForResultLatch); + waitingForResultLatch.await(); latch2.countDown(); return Task.Result.COMPLETED; }); @@ -396,7 +399,7 @@ public interface TaskManagerContract { CountDownLatch latch = new CountDownLatch(1); taskManager.submit( () -> { - await(latch); + latch.await(); return Task.Result.COMPLETED; }); latch.countDown(); @@ -413,24 +416,24 @@ public interface TaskManagerContract { taskManager.submit(() -> { queue.add(1); - sleep(50); + Thread.sleep(50); queue.add(2); return Task.Result.COMPLETED; }); taskManager.submit(() -> { queue.add(3); - sleep(50); + Thread.sleep(50); queue.add(4); return Task.Result.COMPLETED; }); taskManager.submit(() -> { queue.add(5); - sleep(50); + Thread.sleep(50); queue.add(6); return Task.Result.COMPLETED; }); - awaitAtMostOneSecond.until(() -> queue.contains(6)); + awaitAtMostFiveSeconds.until(() -> queue.contains(6)); assertThat(queue) .containsExactly(1, 2, 3, 4, 5, 6); @@ -458,27 +461,7 @@ public interface TaskManagerContract { .isEqualTo(TaskManager.Status.FAILED); } - default void sleep(int durationInMs) { - try { - Thread.sleep(durationInMs); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - default void await(CountDownLatch countDownLatch) { - try { - countDownLatch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - default void waitForResult(CountDownLatch waitingForResultLatch) { - await(waitingForResultLatch); - } - default void awaitUntilTaskHasStatus(TaskId id, TaskManager.Status status, TaskManager taskManager) { - awaitAtMostOneSecond.until(() -> taskManager.getExecutionDetails(id).getStatus().equals(status)); + awaitAtMostFiveSeconds.until(() -> taskManager.getExecutionDetails(id).getStatus(), Matchers.equalTo(status)); } } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org