This is an automated email from the ASF dual-hosted git repository.

matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e3eb60  JAMES-2272 enforce responsibilities segregation for 
Worker/Manager/WorkQueue
8e3eb60 is described below

commit 8e3eb6098fe05a3da582c98fca67f62a90bac16f
Author: Matthieu Baechler <matth...@apache.org>
AuthorDate: Tue Jul 2 11:33:16 2019 +0200

    JAMES-2272 enforce responsibilities segregation for Worker/Manager/WorkQueue
    
        The WorkQueue is only reponsible for submitting and canceling tasks
        The Worker manage the run of a task
---
 .../org/apache/james/task/MemoryTaskManager.java   |  61 ++-------
 .../apache/james/task/MemoryTaskManagerWorker.java | 107 ---------------
 .../apache/james/task/SerialTaskManagerWorker.java | 144 +++++++++++++++++++++
 .../src/main/java/org/apache/james/task/Task.java  |  28 +++-
 .../apache/james/task/TaskExecutionDetails.java    | 130 +++++++++++--------
 .../org/apache/james/task/TaskManagerWorker.java   |   9 +-
 .../main/java/org/apache/james/task/WorkQueue.java |  91 +++++--------
 ...rTest.java => SerialTaskManagerWorkerTest.java} |  46 ++++---
 .../org/apache/james/task/TaskManagerContract.java |  89 ++++++-------
 9 files changed, 350 insertions(+), 355 deletions(-)

diff --git 
a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java 
b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
index 39d7c35..6b4574f 100644
--- a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
+++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
@@ -24,9 +24,7 @@ import java.time.Duration;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
 import javax.annotation.PreDestroy;
@@ -34,7 +32,6 @@ import javax.annotation.PreDestroy;
 import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.ImmutableList;
 import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
 public class MemoryTaskManager implements TaskManager {
@@ -53,8 +50,9 @@ public class MemoryTaskManager implements TaskManager {
         }
 
         @Override
-        public void completed() {
+        public void completed(Task.Result result) {
             updater.accept(TaskExecutionDetails::completed);
+
         }
 
         @Override
@@ -78,47 +76,19 @@ public class MemoryTaskManager implements TaskManager {
     private final WorkQueue workQueue;
     private final TaskManagerWorker worker;
     private final ConcurrentHashMap<TaskId, TaskExecutionDetails> 
idToExecutionDetails;
-    private final ConcurrentHashMap<TaskId, Mono<Task.Result>> tasksResult;
 
     public MemoryTaskManager() {
 
         idToExecutionDetails = new ConcurrentHashMap<>();
-        tasksResult = new ConcurrentHashMap<>();
-        worker = new MemoryTaskManagerWorker();
-        workQueue = WorkQueue.builder()
-            .worker(this::treatTask)
-            .listener(this::listenToWorkQueueEvents);
-    }
-
-    private void listenToWorkQueueEvents(WorkQueue.Event event) {
-        switch (event.status) {
-            case CANCELLED:
-                
updateDetails(event.id).accept(TaskExecutionDetails::cancelEffectively);
-                break;
-            case STARTED:
-                break;
-        }
-    }
-
-    private void treatTask(TaskWithId task) {
-        DetailsUpdater detailsUpdater = updater(task.getId());
-        Mono<Task.Result> result = worker.executeTask(task, detailsUpdater);
-        tasksResult.put(task.getId(), result);
-        try {
-            BiConsumer<Throwable, Object> ignoreException = (t, o) -> { };
-            result
-                .onErrorContinue(InterruptedException.class, ignoreException)
-                .block();
-        } catch (CancellationException e) {
-            // Do not throw CancellationException
-        }
+        worker = new SerialTaskManagerWorker();
+        workQueue = WorkQueue.builder().worker(worker);
     }
 
     public TaskId submit(Task task) {
         TaskId taskId = TaskId.generateTaskId();
         TaskExecutionDetails executionDetails = 
TaskExecutionDetails.from(task, taskId);
         idToExecutionDetails.put(taskId, executionDetails);
-        workQueue.submit(new TaskWithId(taskId, task));
+        workQueue.submit(new TaskWithId(taskId, task), updater(taskId));
         return taskId;
     }
 
@@ -148,11 +118,8 @@ public class MemoryTaskManager implements TaskManager {
     @Override
     public void cancel(TaskId id) {
         Optional.ofNullable(idToExecutionDetails.get(id)).ifPresent(details -> 
{
-                if (details.getStatus().equals(Status.WAITING)) {
-                    
updateDetails(id).accept(TaskExecutionDetails::cancelRequested);
-                }
+                
updateDetails(id).accept(TaskExecutionDetails::cancelRequested);
                 workQueue.cancel(id);
-                worker.cancelTask(id, updater(id));
             }
         );
     }
@@ -161,18 +128,10 @@ public class MemoryTaskManager implements TaskManager {
     public TaskExecutionDetails await(TaskId id) {
         if (Optional.ofNullable(idToExecutionDetails.get(id)).isPresent()) {
             return Flux.interval(NOW, AWAIT_POLLING_DURATION, 
Schedulers.elastic())
-                .filter(ignore -> tasksResult.get(id) != null)
-                .map(ignore -> {
-                    Optional.ofNullable(tasksResult.get(id))
-                        .ifPresent(mono -> {
-                            try {
-                                mono.block();
-                            } catch (CancellationException e) {
-                                // ignore
-                            }
-                        });
-                    return getExecutionDetails(id);
-                })
+                .map(ignored -> getExecutionDetails(id))
+                .filter(details -> details.getStatus() == Status.COMPLETED
+                    || details.getStatus() == Status.FAILED
+                    || details.getStatus() == Status.CANCELLED)
                 .take(1)
                 .blockFirst();
         } else {
diff --git 
a/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java 
b/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java
deleted file mode 100644
index 4dda777..0000000
--- 
a/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-package org.apache.james.task;
-
-import java.time.Duration;
-import java.util.Optional;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.james.util.FunctionalUtils;
-import org.apache.james.util.MDCBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-public class MemoryTaskManagerWorker implements TaskManagerWorker {
-
-    private static final boolean INTERRUPT_IF_RUNNING = true;
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(MemoryTaskManagerWorker.class);
-    private static final Duration CHECK_CANCELED_PERIOD = 
Duration.ofMillis(100);
-    private static final int FIRST = 1;
-
-    private final ConcurrentHashMap<TaskId, CompletableFuture<Task.Result>> 
idToFuture = new ConcurrentHashMap<>();
-
-    @Override
-    public Mono<Task.Result> executeTask(TaskWithId taskWithId, Listener 
listener) {
-        CompletableFuture<Task.Result> futureResult = 
CompletableFuture.supplyAsync(() -> runWithMdc(taskWithId, listener));
-
-        idToFuture.put(taskWithId.getId(), futureResult);
-
-        return Mono.fromFuture(futureResult)
-            .doOnError(res -> {
-                if (!(res instanceof CancellationException)) {
-                    listener.failed(res);
-                    LOGGER.error("Task was partially performed. Check logs for 
more details", res);
-                }
-            })
-            .doOnTerminate(() -> idToFuture.remove(taskWithId.getId()));
-    }
-
-    private Task.Result runWithMdc(TaskWithId taskWithId, Listener listener) {
-        return MDCBuilder.withMdc(
-            MDCBuilder.create()
-                .addContext(Task.TASK_ID, taskWithId.getId())
-                .addContext(Task.TASK_TYPE, taskWithId.getTask().type())
-                .addContext(Task.TASK_DETAILS, taskWithId.getTask().details()),
-            () -> run(taskWithId, listener));
-    }
-
-
-    private Task.Result run(TaskWithId taskWithId, Listener listener) {
-        listener.started();
-        try {
-            return taskWithId.getTask()
-                .run()
-                .onComplete(listener::completed)
-                .onFailure(() -> {
-                    LOGGER.error("Task was partially performed. Check logs for 
more details. Taskid : " + taskWithId.getId());
-                    listener.failed();
-                });
-        } catch (Exception e) {
-            LOGGER.error("Error while running task {}", taskWithId.getId(), e);
-            listener.failed(e);
-            return Task.Result.PARTIAL;
-        }
-    }
-
-    @Override
-    public void cancelTask(TaskId id, Listener listener) {
-        Optional.ofNullable(idToFuture.remove(id))
-            .ifPresent(future -> {
-                requestCancellation(future);
-                waitUntilFutureIsCancelled(future).blockFirst();
-                listener.cancelled();
-            });
-    }
-
-    private void requestCancellation(CompletableFuture<Task.Result> future) {
-        future.cancel(INTERRUPT_IF_RUNNING);
-    }
-
-    private Flux<Boolean> 
waitUntilFutureIsCancelled(CompletableFuture<Task.Result> future) {
-        return Flux.interval(CHECK_CANCELED_PERIOD)
-            .map(ignore -> future.isCancelled())
-            .filter(FunctionalUtils.identityPredicate())
-            .take(FIRST);
-    }
-}
diff --git 
a/server/task/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java 
b/server/task/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
new file mode 100644
index 0000000..d690be2
--- /dev/null
+++ 
b/server/task/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
@@ -0,0 +1,144 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.task;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import org.apache.james.util.MDCBuilder;
+import org.apache.james.util.concurrent.NamedThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+import reactor.core.publisher.Mono;
+import reactor.util.function.Tuple2;
+import reactor.util.function.Tuples;
+
+public class SerialTaskManagerWorker implements TaskManagerWorker {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SerialTaskManagerWorker.class);
+    private final ExecutorService taskExecutor;
+    private final AtomicReference<Tuple2<TaskId, Future<?>>> runningTask;
+    private final Semaphore semaphore;
+    private final Set<TaskId> cancelledTasks;
+
+    SerialTaskManagerWorker() {
+        this.taskExecutor = 
Executors.newSingleThreadExecutor(NamedThreadFactory.withName("task executor"));
+        this.cancelledTasks = Sets.newConcurrentHashSet();
+        this.runningTask = new AtomicReference<>();
+        this.semaphore = new Semaphore(1);
+    }
+
+    @Override
+    public Mono<Task.Result> executeTask(TaskWithId taskWithId, Listener 
listener) {
+            return Mono
+                .using(
+                    acquireSemaphore(taskWithId, listener),
+                    executeWithSemaphore(taskWithId, listener),
+                    Semaphore::release);
+
+    }
+
+    private Callable<Semaphore> acquireSemaphore(TaskWithId taskWithId, 
Listener listener) {
+        return () -> {
+                try {
+                    semaphore.acquire();
+                    return semaphore;
+                } catch (InterruptedException e) {
+                    listener.cancelled();
+                    throw e;
+                }
+            };
+    }
+
+    private Function<Semaphore, Mono<Task.Result>> 
executeWithSemaphore(TaskWithId taskWithId, Listener listener) {
+        return semaphore -> {
+            if (!cancelledTasks.remove(taskWithId.getId())) {
+                CompletableFuture<Task.Result> future = 
CompletableFuture.supplyAsync(() -> runWithMdc(taskWithId, listener), 
taskExecutor);
+                runningTask.set(Tuples.of(taskWithId.getId(), future));
+
+                return Mono.fromFuture(future)
+                        .doOnError(exception -> {
+                            if (exception instanceof CancellationException) {
+                                listener.cancelled();
+                            } else {
+                                listener.failed(exception);
+                            }
+                        })
+                        .onErrorReturn(Task.Result.PARTIAL);
+            } else {
+                listener.cancelled();
+                return Mono.empty();
+            }
+        };
+    }
+
+    private Task.Result runWithMdc(TaskWithId taskWithId, Listener listener) {
+        return MDCBuilder.withMdc(
+            MDCBuilder.create()
+                .addContext(Task.TASK_ID, taskWithId.getId())
+                .addContext(Task.TASK_TYPE, taskWithId.getTask().type())
+                .addContext(Task.TASK_DETAILS, taskWithId.getTask().details()),
+            () -> run(taskWithId, listener));
+    }
+
+    private Task.Result run(TaskWithId taskWithId, Listener listener) {
+        listener.started();
+        try {
+            return taskWithId.getTask()
+                .run()
+                .onComplete(listener::completed)
+                .onFailure(() -> {
+                    LOGGER.error("Task was partially performed. Check logs for 
more details. Taskid : " + taskWithId.getId());
+                    listener.failed();
+                });
+        } catch (InterruptedException e) {
+            listener.cancelled();
+            return Task.Result.PARTIAL;
+        } catch (Exception e) {
+            LOGGER.error("Error while running task {}", taskWithId.getId(), e);
+            listener.failed(e);
+            return Task.Result.PARTIAL;
+        }
+    }
+
+    @Override
+    public void cancelTask(TaskId taskId) {
+        cancelledTasks.add(taskId);
+        Optional.ofNullable(runningTask.get())
+            .filter(task -> task.getT1().equals(taskId))
+            .ifPresent(task -> task.getT2().cancel(true));
+    }
+
+    @Override
+    public void close() throws IOException {
+        taskExecutor.shutdownNow();
+    }
+}
diff --git a/server/task/src/main/java/org/apache/james/task/Task.java 
b/server/task/src/main/java/org/apache/james/task/Task.java
index 0c79ec0..2edb39d 100644
--- a/server/task/src/main/java/org/apache/james/task/Task.java
+++ b/server/task/src/main/java/org/apache/james/task/Task.java
@@ -21,6 +21,7 @@ package org.apache.james.task;
 
 import java.util.Arrays;
 import java.util.Optional;
+import java.util.stream.Stream;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,11 +33,23 @@ public interface Task {
         void run();
     }
 
+    interface CompletionOperation {
+        void complete(Result result);
+    }
+
     enum Result {
         COMPLETED,
         PARTIAL;
 
-        public Result onComplete(Operation... operation) {
+        public Result onComplete(Operation... operations) {
+            return onComplete(Arrays.stream(operations).map(this::wrap));
+        }
+
+        public Result onComplete(CompletionOperation... operation) {
+            return onComplete(Arrays.stream(operation));
+        }
+
+        private Result onComplete(Stream<CompletionOperation> operation) {
             try {
                 if (this == COMPLETED) {
                     run(operation);
@@ -48,16 +61,19 @@ public interface Task {
             }
         }
 
-        public Result onFailure(Operation... operation) {
+        public Result onFailure(Operation... operations) {
             if (this == PARTIAL) {
-                run(operation);
+                run(Arrays.stream(operations).map(this::wrap));
             }
             return this;
         }
 
-        private void run(Operation... operation) {
-            Arrays.stream(operation)
-                .forEach(Operation::run);
+        private CompletionOperation wrap(Operation o) {
+            return result -> o.run();
+        }
+
+        private void run(Stream<CompletionOperation> operations) {
+            operations.forEach(operation -> operation.complete(this));
         }
     }
 
diff --git 
a/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java 
b/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java
index b34ac60..a75b9e7 100644
--- a/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java
+++ b/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java
@@ -22,8 +22,6 @@ package org.apache.james.task;
 import java.time.ZonedDateTime;
 import java.util.Optional;
 
-import com.google.common.base.Preconditions;
-
 public class TaskExecutionDetails {
 
     public interface AdditionalInformation {
@@ -102,69 +100,93 @@ public class TaskExecutionDetails {
     }
 
     public TaskExecutionDetails start() {
-        Preconditions.checkState(status == TaskManager.Status.WAITING, 
"expected WAITING actual status is " + status);
-        return new TaskExecutionDetails(
-            taskId,
-            task,
-            TaskManager.Status.IN_PROGRESS,
-            submitDate,
-            Optional.of(ZonedDateTime.now()),
-            Optional.empty(),
-            Optional.empty(),
-            Optional.empty());
+        switch (status) {
+            case WAITING:
+                return new TaskExecutionDetails(
+                    taskId,
+                    task,
+                    TaskManager.Status.IN_PROGRESS,
+                    submitDate,
+                    Optional.of(ZonedDateTime.now()),
+                    Optional.empty(),
+                    Optional.empty(),
+                    Optional.empty());
+            default:
+                return this;
+        }
     }
 
     public TaskExecutionDetails completed() {
-        Preconditions.checkState(status == TaskManager.Status.IN_PROGRESS);
-        return new TaskExecutionDetails(
-            taskId,
-            task,
-            TaskManager.Status.COMPLETED,
-            submitDate,
-            startedDate,
-            Optional.of(ZonedDateTime.now()),
-            Optional.empty(),
-            Optional.empty());
+        switch (status) {
+            case IN_PROGRESS:
+            case CANCEL_REQUESTED:
+            case WAITING:
+                return new TaskExecutionDetails(
+                    taskId,
+                    task,
+                    TaskManager.Status.COMPLETED,
+                    submitDate,
+                    startedDate,
+                    Optional.of(ZonedDateTime.now()),
+                    Optional.empty(),
+                    Optional.empty());
+            default:
+                return this;
+        }
     }
 
     public TaskExecutionDetails failed() {
-        Preconditions.checkState(status == TaskManager.Status.IN_PROGRESS);
-        return new TaskExecutionDetails(
-            taskId,
-            task,
-            TaskManager.Status.FAILED,
-            submitDate,
-            startedDate,
-            Optional.empty(),
-            Optional.empty(),
-            Optional.of(ZonedDateTime.now()));
+        switch (status) {
+            case IN_PROGRESS:
+            case CANCEL_REQUESTED:
+                return new TaskExecutionDetails(
+                    taskId,
+                    task,
+                    TaskManager.Status.FAILED,
+                    submitDate,
+                    startedDate,
+                    Optional.empty(),
+                    Optional.empty(),
+                    Optional.of(ZonedDateTime.now()));
+            default:
+                return this;
+        }
     }
 
     public TaskExecutionDetails cancelRequested() {
-        Preconditions.checkState(status == TaskManager.Status.IN_PROGRESS
-            || status == TaskManager.Status.WAITING);
-        return new TaskExecutionDetails(
-            taskId,
-            task,
-            TaskManager.Status.CANCEL_REQUESTED,
-            submitDate,
-            startedDate,
-            Optional.empty(),
-            Optional.of(ZonedDateTime.now()),
-            Optional.empty());
+        switch (status) {
+            case IN_PROGRESS:
+            case WAITING:
+                return new TaskExecutionDetails(
+                    taskId,
+                    task,
+                    TaskManager.Status.CANCEL_REQUESTED,
+                    submitDate,
+                    startedDate,
+                    Optional.empty(),
+                    Optional.of(ZonedDateTime.now()),
+                    Optional.empty());
+            default:
+                return this;
+        }
     }
 
     public TaskExecutionDetails cancelEffectively() {
-        Preconditions.checkState(status == TaskManager.Status.IN_PROGRESS
-            || status == TaskManager.Status.WAITING || status == 
TaskManager.Status.CANCEL_REQUESTED);
-        return new TaskExecutionDetails(
-            taskId,
-            task,
-            TaskManager.Status.CANCELLED,
-            submitDate,
-            startedDate,
-            Optional.empty(),
-            Optional.of(ZonedDateTime.now()),
-            Optional.empty());
+        switch (status) {
+            case CANCEL_REQUESTED:
+            case IN_PROGRESS:
+            case WAITING:
+                return new TaskExecutionDetails(
+                    taskId,
+                    task,
+                    TaskManager.Status.CANCELLED,
+                    submitDate,
+                    startedDate,
+                    Optional.empty(),
+                    Optional.of(ZonedDateTime.now()),
+                    Optional.empty());
+            default:
+                return this;
+        }
     }
 }
diff --git 
a/server/task/src/main/java/org/apache/james/task/TaskManagerWorker.java 
b/server/task/src/main/java/org/apache/james/task/TaskManagerWorker.java
index 8a806e6..cacc6c8 100644
--- a/server/task/src/main/java/org/apache/james/task/TaskManagerWorker.java
+++ b/server/task/src/main/java/org/apache/james/task/TaskManagerWorker.java
@@ -18,14 +18,16 @@
  ****************************************************************/
 package org.apache.james.task;
 
+import java.io.Closeable;
+
 import reactor.core.publisher.Mono;
 
-public interface TaskManagerWorker {
+public interface TaskManagerWorker extends Closeable {
 
     interface Listener {
         void started();
 
-        void completed();
+        void completed(Task.Result result);
 
         void failed(Throwable t);
 
@@ -36,6 +38,5 @@ public interface TaskManagerWorker {
 
     Mono<Task.Result> executeTask(TaskWithId taskWithId, Listener listener);
 
-    void cancelTask(TaskId id, Listener listener);
-
+    void cancelTask(TaskId taskId);
 }
diff --git a/server/task/src/main/java/org/apache/james/task/WorkQueue.java 
b/server/task/src/main/java/org/apache/james/task/WorkQueue.java
index 3484d2a..e2e7035 100644
--- a/server/task/src/main/java/org/apache/james/task/WorkQueue.java
+++ b/server/task/src/main/java/org/apache/james/task/WorkQueue.java
@@ -21,81 +21,54 @@ package org.apache.james.task;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.function.Consumer;
+import java.util.concurrent.LinkedBlockingQueue;
 
-import com.google.common.collect.Sets;
 import reactor.core.Disposable;
-import reactor.core.publisher.WorkQueueProcessor;
+import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
+import reactor.util.function.Tuple2;
+import reactor.util.function.Tuples;
 
 public class WorkQueue implements Closeable {
 
-    public enum Status {
-        STARTED,
-        CANCELLED
-    }
-
-    public static class Event {
-        public final TaskId id;
-        public final Status status;
-
-        private Event(TaskId id, Status status) {
-            this.id = id;
-            this.status = status;
-        }
-    }
-
     public static RequireWorker builder() {
-        return worker -> listener -> new WorkQueue(worker, listener);
+        return WorkQueue::new;
     }
 
     public interface RequireWorker {
-        RequireListener worker(Consumer<TaskWithId> worker);
-    }
-
-    public interface RequireListener {
-        WorkQueue listener(Consumer<Event> worker);
+        WorkQueue worker(TaskManagerWorker worker);
     }
 
-    private final WorkQueueProcessor<TaskWithId> workQueue;
-    private final ExecutorService taskExecutor = 
Executors.newSingleThreadExecutor();
-    private final ExecutorService requestTaskExecutor = 
Executors.newSingleThreadExecutor();
-    private final Set<TaskId> cancelledTasks;
-    private final Consumer<Event> listener;
+    private final TaskManagerWorker worker;
     private final Disposable subscription;
-
-    private WorkQueue(Consumer<TaskWithId> worker, Consumer<Event> listener) {
-        this.listener = listener;
-        cancelledTasks = Sets.newConcurrentHashSet();
-        workQueue = WorkQueueProcessor.<TaskWithId>builder()
-            .executor(taskExecutor)
-            .requestTaskExecutor(requestTaskExecutor)
-            .build();
-        subscription = workQueue
-            .subscribeOn(Schedulers.single())
-            .subscribe(dispatchNonCancelledTaskToWorker(worker));
+    private final LinkedBlockingQueue<Tuple2<TaskWithId, 
TaskManagerWorker.Listener>> tasks;
+
+    private WorkQueue(TaskManagerWorker worker) {
+        this.worker = worker;
+        this.tasks = new LinkedBlockingQueue<>();
+        this.subscription = Mono.fromCallable(tasks::take)
+            .repeat()
+            .subscribeOn(Schedulers.elastic())
+            .flatMapSequential(this::dispatchTaskToWorker)
+            .subscribe();
     }
 
-    private Consumer<TaskWithId> 
dispatchNonCancelledTaskToWorker(Consumer<TaskWithId> delegate) {
-        return taskWithId -> {
-            if (!cancelledTasks.remove(taskWithId.getId())) {
-                listener.accept(new Event(taskWithId.getId(), Status.STARTED));
-                delegate.accept(taskWithId);
-            } else {
-                listener.accept(new Event(taskWithId.getId(), 
Status.CANCELLED));
-            }
-        };
+    private Mono<?> dispatchTaskToWorker(Tuple2<TaskWithId, 
TaskManagerWorker.Listener> tuple) {
+        TaskWithId taskWithId = tuple.getT1();
+        TaskManagerWorker.Listener listener = tuple.getT2();
+        return worker.executeTask(taskWithId, listener);
     }
 
-    public void submit(TaskWithId taskWithId) {
-        workQueue.onNext(taskWithId);
+    public void submit(TaskWithId taskWithId, TaskManagerWorker.Listener 
listener) {
+        try {
+            tasks.put(Tuples.of(taskWithId, listener));
+        } catch (InterruptedException e) {
+            listener.cancelled();
+        }
     }
 
     public void cancel(TaskId taskId) {
-        cancelledTasks.add(taskId);
+        worker.cancelTask(taskId);
     }
 
     @Override
@@ -105,13 +78,7 @@ public class WorkQueue implements Closeable {
         } catch (Throwable ignore) {
             //avoid failing during close
         }
-        try {
-            workQueue.dispose();
-        } catch (Throwable ignore) {
-            //avoid failing during close
-        }
-        taskExecutor.shutdownNow();
-        requestTaskExecutor.shutdownNow();
+        worker.close();
     }
 
 }
diff --git 
a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerWorkerTest.java
 
b/server/task/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
similarity index 79%
rename from 
server/task/src/test/java/org/apache/james/task/MemoryTaskManagerWorkerTest.java
rename to 
server/task/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
index 5d8a13f..77aaf88 100644
--- 
a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerWorkerTest.java
+++ 
b/server/task/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
@@ -25,16 +25,20 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 
+import java.io.IOException;
+import java.time.Duration;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 
 import reactor.core.publisher.Mono;
 
-class MemoryTaskManagerWorkerTest {
+class SerialTaskManagerWorkerTest {
 
-    private final MemoryTaskManagerWorker worker = new 
MemoryTaskManagerWorker();
+    private final SerialTaskManagerWorker worker = new 
SerialTaskManagerWorker();
 
     private final Task successfulTask = () -> Task.Result.COMPLETED;
     private final Task failedTask = () -> Task.Result.PARTIAL;
@@ -42,17 +46,22 @@ class MemoryTaskManagerWorkerTest {
         throw new RuntimeException("Throwing Task");
     };
 
+    @AfterEach
+    void tearDown() throws IOException {
+        worker.close();
+    }
+
     @Test
     void aSuccessfullTaskShouldCompleteSuccessfully() {
         TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), 
this.successfulTask);
 
         TaskManagerWorker.Listener listener = 
mock(TaskManagerWorker.Listener.class);
 
-        Mono<Task.Result> result = worker.executeTask(taskWithId, 
listener).cache();
+        Mono<Task.Result> result = worker.executeTask(taskWithId, listener);
 
         assertThat(result.block()).isEqualTo(Task.Result.COMPLETED);
 
-        verify(listener, atLeastOnce()).completed();
+        verify(listener, atLeastOnce()).completed(Task.Result.COMPLETED);
     }
 
     @Test
@@ -60,7 +69,7 @@ class MemoryTaskManagerWorkerTest {
         TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), 
failedTask);
         TaskManagerWorker.Listener listener = 
mock(TaskManagerWorker.Listener.class);
 
-        Mono<Task.Result> result = worker.executeTask(taskWithId, 
listener).cache();
+        Mono<Task.Result> result = worker.executeTask(taskWithId, listener);
 
         assertThat(result.block()).isEqualTo(Task.Result.PARTIAL);
         verify(listener, atLeastOnce()).failed();
@@ -71,14 +80,14 @@ class MemoryTaskManagerWorkerTest {
         TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), 
throwingTask);
         TaskManagerWorker.Listener listener = 
mock(TaskManagerWorker.Listener.class);
 
-        Mono<Task.Result> result = worker.executeTask(taskWithId, 
listener).cache();
+        Mono<Task.Result> result = worker.executeTask(taskWithId, listener);
 
         assertThat(result.block()).isEqualTo(Task.Result.PARTIAL);
         verify(listener, atLeastOnce()).failed(any(RuntimeException.class));
     }
 
     @Test
-    void theWorkerShouldReportThatATaskIsInProgress() {
+    void theWorkerShouldReportThatATaskIsInProgress() throws 
InterruptedException {
         TaskId id = TaskId.generateTaskId();
         CountDownLatch latch = new CountDownLatch(1);
         CountDownLatch taskLaunched = new CountDownLatch(1);
@@ -93,7 +102,7 @@ class MemoryTaskManagerWorkerTest {
 
         TaskManagerWorker.Listener listener = 
mock(TaskManagerWorker.Listener.class);
 
-        worker.executeTask(taskWithId, listener).cache();
+        worker.executeTask(taskWithId, listener).subscribe();
 
         await(taskLaunched);
         verify(listener, atLeastOnce()).started();
@@ -102,7 +111,7 @@ class MemoryTaskManagerWorkerTest {
     }
 
     @Test
-    void theWorkerShouldCancelAnInProgressTask() {
+    void theWorkerShouldCancelAnInProgressTask() throws InterruptedException {
         TaskId id = TaskId.generateTaskId();
         AtomicInteger counter = new AtomicInteger(0);
         CountDownLatch latch = new CountDownLatch(1);
@@ -117,21 +126,22 @@ class MemoryTaskManagerWorkerTest {
 
         TaskManagerWorker.Listener listener = 
mock(TaskManagerWorker.Listener.class);
 
-        worker.executeTask(taskWithId, listener).cache();
+        Mono<Task.Result> resultMono = worker.executeTask(taskWithId, 
listener).cache();
+        resultMono.subscribe();
 
-        worker.cancelTask(id, listener);
+        Awaitility.waitAtMost(org.awaitility.Duration.TEN_SECONDS)
+            .untilAsserted(() -> verify(listener, atLeastOnce()).started());
+
+        worker.cancelTask(id);
+
+        resultMono.block(Duration.ofSeconds(10));
 
-        verify(listener, atLeastOnce()).started();
         verify(listener, atLeastOnce()).cancelled();
         verifyNoMoreInteractions(listener);
     }
 
 
-    private void await(CountDownLatch countDownLatch) {
-        try {
-            countDownLatch.await();
-        } catch (InterruptedException e) {
-            throw new RuntimeException(e);
-        }
+    private void await(CountDownLatch countDownLatch) throws 
InterruptedException {
+        countDownLatch.await();
     }
 }
diff --git 
a/server/task/src/test/java/org/apache/james/task/TaskManagerContract.java 
b/server/task/src/test/java/org/apache/james/task/TaskManagerContract.java
index af83a76..9c93da4 100644
--- a/server/task/src/test/java/org/apache/james/task/TaskManagerContract.java
+++ b/server/task/src/test/java/org/apache/james/task/TaskManagerContract.java
@@ -3,8 +3,8 @@ package org.apache.james.task;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.awaitility.Duration.FIVE_SECONDS;
 import static org.awaitility.Duration.ONE_HUNDRED_MILLISECONDS;
-import static org.awaitility.Duration.ONE_SECOND;
 
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -15,6 +15,7 @@ import org.assertj.core.api.SoftAssertions;
 import org.awaitility.Awaitility;
 import org.awaitility.Duration;
 import org.awaitility.core.ConditionFactory;
+import org.hamcrest.Matchers;
 import org.junit.jupiter.api.Test;
 
 public interface TaskManagerContract {
@@ -26,7 +27,7 @@ public interface TaskManagerContract {
         .with()
         .pollDelay(slowPacedPollInterval)
         .await();
-    ConditionFactory awaitAtMostOneSecond = calmlyAwait.atMost(ONE_SECOND);
+    ConditionFactory awaitAtMostFiveSeconds = calmlyAwait.atMost(FIVE_SECONDS);
 
     TaskManager taskManager();
 
@@ -41,7 +42,7 @@ public interface TaskManagerContract {
     default void 
getStatusShouldReturnWaitingWhenNotYetProcessed(CountDownLatch 
waitingForResultLatch) {
         TaskManager taskManager = taskManager();
         taskManager.submit(() -> {
-            waitForResult(waitingForResultLatch);
+            waitingForResultLatch.await();
             return Task.Result.COMPLETED;
         });
 
@@ -52,21 +53,21 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void taskCodeAfterCancelIsNotRun(CountDownLatch 
waitingForResultLatch) {
+    default void taskCodeAfterCancelIsNotRun(CountDownLatch 
waitingForResultLatch) throws InterruptedException {
         TaskManager taskManager = taskManager();
         CountDownLatch waitForTaskToBeLaunched = new CountDownLatch(1);
         AtomicInteger count = new AtomicInteger(0);
 
         TaskId id = taskManager.submit(() -> {
             waitForTaskToBeLaunched.countDown();
-            waitForResult(waitingForResultLatch);
+            waitingForResultLatch.await();
             //We sleep to handover the CPU to the scheduler
             Thread.sleep(1);
             count.incrementAndGet();
             return Task.Result.COMPLETED;
         });
 
-        await(waitForTaskToBeLaunched);
+        waitForTaskToBeLaunched.await();
         taskManager.cancel(id);
 
         assertThat(count.get()).isEqualTo(0);
@@ -105,10 +106,10 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void getStatusShouldBeCancelledWhenCancelled() {
+    default void getStatusShouldBeCancelledWhenCancelled(CountDownLatch 
countDownLatch) {
         TaskManager taskManager = taskManager();
         TaskId id = taskManager.submit(() -> {
-            sleep(500);
+            countDownLatch.await();
             return Task.Result.COMPLETED;
         });
 
@@ -118,6 +119,8 @@ public interface TaskManagerContract {
         assertThat(taskManager.getExecutionDetails(id).getStatus())
             .isIn(TaskManager.Status.CANCELLED, 
TaskManager.Status.CANCEL_REQUESTED);
 
+        countDownLatch.countDown();
+
         awaitUntilTaskHasStatus(id, TaskManager.Status.CANCELLED, taskManager);
         assertThat(taskManager.getExecutionDetails(id).getStatus())
             .isEqualTo(TaskManager.Status.CANCELLED);
@@ -125,10 +128,10 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void aWaitingTaskShouldBeCancelled() {
+    default void aWaitingTaskShouldBeCancelled(CountDownLatch countDownLatch) {
         TaskManager taskManager = taskManager();
         TaskId id = taskManager.submit(() -> {
-            sleep(500);
+            countDownLatch.await();
             return Task.Result.COMPLETED;
         });
 
@@ -138,21 +141,21 @@ public interface TaskManagerContract {
 
         awaitUntilTaskHasStatus(id, TaskManager.Status.IN_PROGRESS, 
taskManager);
 
-
         assertThat(taskManager.getExecutionDetails(idTaskToCancel).getStatus())
             .isIn(TaskManager.Status.CANCELLED, 
TaskManager.Status.CANCEL_REQUESTED);
 
+        countDownLatch.countDown();
+
         awaitUntilTaskHasStatus(idTaskToCancel, TaskManager.Status.CANCELLED, 
taskManager);
         assertThat(taskManager.getExecutionDetails(idTaskToCancel).getStatus())
             .isEqualTo(TaskManager.Status.CANCELLED);
-
     }
 
     @Test
     default void cancelShouldBeIdempotent(CountDownLatch 
waitingForResultLatch) {
         TaskManager taskManager = taskManager();
         TaskId id = taskManager.submit(() -> {
-            waitForResult(waitingForResultLatch);
+            waitingForResultLatch.await();
             return Task.Result.COMPLETED;
         });
         awaitUntilTaskHasStatus(id, TaskManager.Status.IN_PROGRESS, 
taskManager);
@@ -165,7 +168,7 @@ public interface TaskManagerContract {
     default void 
getStatusShouldReturnInProgressWhenProcessingIsInProgress(CountDownLatch 
waitingForResultLatch) {
         TaskManager taskManager = taskManager();
         TaskId taskId = taskManager.submit(() -> {
-            waitForResult(waitingForResultLatch);
+            waitingForResultLatch.await();
             return Task.Result.COMPLETED;
         });
         awaitUntilTaskHasStatus(taskId, TaskManager.Status.IN_PROGRESS, 
taskManager);
@@ -210,7 +213,7 @@ public interface TaskManagerContract {
         TaskId inProgressId = taskManager.submit(
             () -> {
                 latch1.countDown();
-                waitForResult(waitingForResultLatch);
+                waitingForResultLatch.await();
                 return Task.Result.COMPLETED;
             });
         TaskId waitingId = taskManager.submit(
@@ -250,14 +253,14 @@ public interface TaskManagerContract {
             () -> Task.Result.COMPLETED);
         taskManager.submit(
             () -> {
-                await(latch1);
+                latch1.await();
                 latch2.countDown();
-                waitForResult(waitingForResultLatch);
+                waitingForResultLatch.await();
                 return Task.Result.COMPLETED;
             });
         TaskId waitingId = taskManager.submit(
             () -> {
-                waitForResult(waitingForResultLatch);
+                waitingForResultLatch.await();
                 latch2.countDown();
                 return Task.Result.COMPLETED;
             });
@@ -282,14 +285,14 @@ public interface TaskManagerContract {
             () -> Task.Result.COMPLETED);
         taskManager.submit(
             () -> {
-                await(latch1);
+                latch1.await();
                 latch2.countDown();
-                waitForResult(waitingForResultLatch);
+                waitingForResultLatch.await();
                 return Task.Result.COMPLETED;
             });
         taskManager.submit(
             () -> {
-                waitForResult(waitingForResultLatch);
+                waitingForResultLatch.await();
                 latch2.countDown();
                 return Task.Result.COMPLETED;
             });
@@ -314,14 +317,14 @@ public interface TaskManagerContract {
             () -> Task.Result.COMPLETED);
         taskManager.submit(
             () -> {
-                await(latch1);
+                latch1.await();
                 latch2.countDown();
-                waitForResult(waitingForResultLatch);
+                waitingForResultLatch.await();
                 return Task.Result.COMPLETED;
             });
         taskManager.submit(
             () -> {
-                waitForResult(waitingForResultLatch);
+                waitingForResultLatch.await();
                 latch2.countDown();
                 return Task.Result.COMPLETED;
             });
@@ -346,14 +349,14 @@ public interface TaskManagerContract {
             () -> Task.Result.COMPLETED);
         TaskId inProgressId = taskManager.submit(
             () -> {
-                await(latch1);
+                latch1.await();
                 latch2.countDown();
-                waitForResult(waitingForResultLatch);
+                waitingForResultLatch.await();
                 return Task.Result.COMPLETED;
             });
         taskManager.submit(
             () -> {
-                waitForResult(waitingForResultLatch);
+                waitingForResultLatch.await();
                 latch2.countDown();
                 return Task.Result.COMPLETED;
             });
@@ -396,7 +399,7 @@ public interface TaskManagerContract {
         CountDownLatch latch = new CountDownLatch(1);
         taskManager.submit(
             () -> {
-                await(latch);
+                latch.await();
                 return Task.Result.COMPLETED;
             });
         latch.countDown();
@@ -413,24 +416,24 @@ public interface TaskManagerContract {
 
         taskManager.submit(() -> {
             queue.add(1);
-            sleep(50);
+            Thread.sleep(50);
             queue.add(2);
             return Task.Result.COMPLETED;
         });
         taskManager.submit(() -> {
             queue.add(3);
-            sleep(50);
+            Thread.sleep(50);
             queue.add(4);
             return Task.Result.COMPLETED;
         });
         taskManager.submit(() -> {
             queue.add(5);
-            sleep(50);
+            Thread.sleep(50);
             queue.add(6);
             return Task.Result.COMPLETED;
         });
 
-        awaitAtMostOneSecond.until(() -> queue.contains(6));
+        awaitAtMostFiveSeconds.until(() -> queue.contains(6));
 
         assertThat(queue)
             .containsExactly(1, 2, 3, 4, 5, 6);
@@ -458,27 +461,7 @@ public interface TaskManagerContract {
             .isEqualTo(TaskManager.Status.FAILED);
     }
 
-    default void sleep(int durationInMs) {
-        try {
-            Thread.sleep(durationInMs);
-        } catch (InterruptedException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    default void await(CountDownLatch countDownLatch) {
-        try {
-            countDownLatch.await();
-        } catch (InterruptedException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    default void waitForResult(CountDownLatch waitingForResultLatch) {
-        await(waitingForResultLatch);
-    }
-
     default void awaitUntilTaskHasStatus(TaskId id, TaskManager.Status status, 
TaskManager taskManager) {
-        awaitAtMostOneSecond.until(() -> 
taskManager.getExecutionDetails(id).getStatus().equals(status));
+        awaitAtMostFiveSeconds.until(() -> 
taskManager.getExecutionDetails(id).getStatus(), Matchers.equalTo(status));
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to