This is an automated email from the ASF dual-hosted git repository. rouazana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 1f232d46e0c7ffc56b988afb0bf38e5bf1cbdcb8 Author: Rémi Kowalski <[email protected]> AuthorDate: Tue May 21 11:43:16 2019 +0200 JAMES-2777 make the MemoryTaskManager deleguate the execution of the tasks to its worker --- server/task/pom.xml | 5 + .../org/apache/james/task/MemoryTaskManager.java | 154 ++++++++++----------- .../apache/james/task/MemoryTaskManagerTest.java | 75 ++++++---- 3 files changed, 128 insertions(+), 106 deletions(-) diff --git a/server/task/pom.xml b/server/task/pom.xml index 32a4647..3ade1a7 100644 --- a/server/task/pom.xml +++ b/server/task/pom.xml @@ -36,6 +36,11 @@ <artifactId>james-server-util</artifactId> </dependency> <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java index e26e173..1623858 100644 --- a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java +++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java @@ -19,102 +19,69 @@ package org.apache.james.task; +import java.time.Duration; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadFactory; import java.util.function.Consumer; import javax.annotation.PreDestroy; -import org.apache.james.util.MDCBuilder; -import org.apache.james.util.concurrent.NamedThreadFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.github.fge.lambdas.Throwing; import com.github.steveash.guavate.Guavate; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.WorkQueueProcessor; +import reactor.core.scheduler.Schedulers; public class MemoryTaskManager implements TaskManager { - private static final boolean INTERRUPT_IF_RUNNING = true; - private static final Logger LOGGER = LoggerFactory.getLogger(MemoryTaskManager.class); - + public static final Duration AWAIT_POLLING_DURATION = Duration.ofMillis(500); + public static final Duration NOW = Duration.ZERO; + private final WorkQueueProcessor<TaskWithId> workQueue; + private final TaskManagerWorker worker; private final ConcurrentHashMap<TaskId, TaskExecutionDetails> idToExecutionDetails; - private final ConcurrentHashMap<TaskId, Future<?>> idToFuture; - private final ExecutorService executor; + private final ConcurrentHashMap<TaskId, Mono<Task.Result>> tasksResult; + private final ExecutorService taskExecutor = Executors.newSingleThreadExecutor(); + private final ExecutorService requestTaskExecutor = Executors.newSingleThreadExecutor(); public MemoryTaskManager() { + workQueue = WorkQueueProcessor.<TaskWithId>builder() + .executor(taskExecutor) + .requestTaskExecutor(requestTaskExecutor) + .build(); idToExecutionDetails = new ConcurrentHashMap<>(); - idToFuture = new ConcurrentHashMap<>(); - ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass()); - executor = Executors.newSingleThreadExecutor(threadFactory); + tasksResult = new ConcurrentHashMap<>(); + worker = new MemoryTaskManagerWorker(); + workQueue + .subscribeOn(Schedulers.single()) + .filter(task -> !listIds(Status.CANCELLED).contains(task.getId())) + .subscribe(this::treatTask); } - @Override - public TaskId submit(Task task) { - return submit(task, id -> { }); + private void treatTask(TaskWithId task) { + Mono<Task.Result> result = worker.executeTask(task, updateDetails(task.getId())); + tasksResult.put(task.getId(), result); + try { + result.block(); + } catch (CancellationException e) { + // Do not throw CancellationException + } } - @VisibleForTesting - TaskId submit(Task task, Consumer<TaskId> callback) { + public TaskId submit(Task task) { TaskId taskId = TaskId.generateTaskId(); TaskExecutionDetails executionDetails = TaskExecutionDetails.from(task, taskId); - idToExecutionDetails.put(taskId, executionDetails); - idToFuture.put(taskId, - executor.submit(() -> runWithMdc(executionDetails, task, callback))); + workQueue.onNext(new TaskWithId(taskId, task)); return taskId; } - private void runWithMdc(TaskExecutionDetails executionDetails, Task task, Consumer<TaskId> callback) { - MDCBuilder.withMdc( - MDCBuilder.create() - .addContext(Task.TASK_ID, executionDetails.getTaskId()) - .addContext(Task.TASK_TYPE, executionDetails.getType()) - .addContext(Task.TASK_DETAILS, executionDetails.getAdditionalInformation()), - () -> run(executionDetails, task, callback)); - } - - private void run(TaskExecutionDetails executionDetails, Task task, Consumer<TaskId> callback) { - TaskExecutionDetails started = executionDetails.start(); - idToExecutionDetails.put(started.getTaskId(), started); - try { - task.run() - .onComplete(() -> success(started)) - .onFailure(() -> failed(started, - logger -> logger.info("Task was partially performed. Check logs for more details"))); - } catch (Exception e) { - failed(started, - logger -> logger.error("Error while running task", executionDetails, e)); - } finally { - idToFuture.remove(executionDetails.getTaskId()); - callback.accept(executionDetails.getTaskId()); - } - } - - private void success(TaskExecutionDetails started) { - if (!wasCancelled(started.getTaskId())) { - idToExecutionDetails.put(started.getTaskId(), started.completed()); - LOGGER.info("Task success"); - } - } - - private void failed(TaskExecutionDetails started, Consumer<Logger> logOperation) { - if (!wasCancelled(started.getTaskId())) { - idToExecutionDetails.put(started.getTaskId(), started.failed()); - logOperation.accept(LOGGER); - } - } - - private boolean wasCancelled(TaskId taskId) { - return idToExecutionDetails.get(taskId).getStatus() == Status.CANCELLED; - } - @Override public TaskExecutionDetails getExecutionDetails(TaskId id) { return Optional.ofNullable(idToExecutionDetails.get(id)) @@ -128,32 +95,55 @@ public class MemoryTaskManager implements TaskManager { @Override public List<TaskExecutionDetails> list(Status status) { - return idToExecutionDetails.values() + return ImmutableList.copyOf(tasksFiltered(status).values()); + } + + public Set<TaskId> listIds(Status status) { + return tasksFiltered(status).keySet(); + } + + public Map<TaskId, TaskExecutionDetails> tasksFiltered(Status status) { + return idToExecutionDetails.entrySet() .stream() - .filter(details -> details.getStatus().equals(status)) - .collect(Guavate.toImmutableList()); + .filter(details -> details.getValue().getStatus().equals(status)) + .collect(Guavate.entriesToImmutableMap()); } @Override public void cancel(TaskId id) { - Optional.ofNullable(idToFuture.get(id)) - .ifPresent(future -> { - TaskExecutionDetails executionDetails = idToExecutionDetails.get(id); - idToExecutionDetails.put(id, executionDetails.cancel()); - future.cancel(INTERRUPT_IF_RUNNING); - idToFuture.remove(id); - }); + TaskExecutionDetails details = getExecutionDetails(id); + if (details.getStatus().equals(Status.IN_PROGRESS) || details.getStatus().equals(Status.WAITING)) { + worker.cancelTask(id, updateDetails(id)); + } } @Override public TaskExecutionDetails await(TaskId id) { - Optional.ofNullable(idToFuture.get(id)) - .ifPresent(Throwing.consumer(Future::get)); - return getExecutionDetails(id); + if (Optional.ofNullable(getExecutionDetails(id)).isPresent()) { + return Flux.interval(NOW, AWAIT_POLLING_DURATION, Schedulers.elastic()) + .filter(ignore -> tasksResult.get(id) != null) + .map(ignore -> { + Optional.ofNullable(tasksResult.get(id)) + .ifPresent(Throwing.<Mono<Task.Result>>consumer(Mono::block).orDoNothing()); + return getExecutionDetails(id); + }) + .take(1) + .blockFirst(); + } else { + return null; + } } @PreDestroy public void stop() { - executor.shutdownNow(); + taskExecutor.shutdown(); + requestTaskExecutor.shutdown(); + } + + private Consumer<TaskExecutionDetailsUpdater> updateDetails(TaskId taskId) { + return updater -> { + TaskExecutionDetails newDetails = updater.update(idToExecutionDetails.get(taskId)); + idToExecutionDetails.replace(taskId, newDetails); + }; } } diff --git a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java index cec818b..c80f295 100644 --- a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java +++ b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java @@ -22,6 +22,9 @@ package org.apache.james.task; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.awaitility.Duration.ONE_HUNDRED_MILLISECONDS; +import static org.awaitility.Duration.ONE_MINUTE; +import static org.awaitility.Duration.ONE_SECOND; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; @@ -29,6 +32,9 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import org.assertj.core.api.JUnitSoftAssertions; +import org.awaitility.Awaitility; +import org.awaitility.Duration; +import org.awaitility.core.ConditionFactory; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -39,6 +45,7 @@ import com.github.fge.lambdas.consumers.ConsumerChainer; public class MemoryTaskManagerTest { + public static final int TIME_TO_ENQUEUE = 200; private MemoryTaskManager memoryTaskManager; @Rule @@ -54,6 +61,16 @@ public class MemoryTaskManagerTest { memoryTaskManager.stop(); } + Duration slowPacedPollInterval = ONE_HUNDRED_MILLISECONDS; + ConditionFactory calmlyAwait = Awaitility.with() + .pollInterval(slowPacedPollInterval) + .and() + .with() + .pollDelay(slowPacedPollInterval) + .await(); + ConditionFactory awaitAtMostOneMinute = calmlyAwait.atMost(ONE_MINUTE); + ConditionFactory awaitAtMostOneSecond = calmlyAwait.atMost(ONE_SECOND); + @Test public void getStatusShouldReturnUnknownWhenUnknownId() { TaskId unknownId = TaskId.generateTaskId(); @@ -87,6 +104,7 @@ public class MemoryTaskManagerTest { return Task.Result.COMPLETED; }); + sleep(TIME_TO_ENQUEUE); memoryTaskManager.cancel(id); task1Latch.countDown(); @@ -94,26 +112,21 @@ public class MemoryTaskManagerTest { } @Test - public void getStatusShouldReturnCancelledWhenCancelled() throws Exception { - CountDownLatch task1Latch = new CountDownLatch(1); - CountDownLatch ensureStartedLatch = new CountDownLatch(1); - CountDownLatch ensureFinishedLatch = new CountDownLatch(1); + public void getStatusShouldBeCancelledWhenCancelled() throws Exception { TaskId id = memoryTaskManager.submit(() -> { - ensureStartedLatch.countDown(); - await(task1Latch); + sleep(500); return Task.Result.COMPLETED; - }, - any -> ensureFinishedLatch.countDown()); + }); - ensureStartedLatch.await(); + sleep(TIME_TO_ENQUEUE); memoryTaskManager.cancel(id); - ensureFinishedLatch.await(); assertThat(memoryTaskManager.getExecutionDetails(id).getStatus()) .isEqualTo(TaskManager.Status.CANCELLED); } + @Test public void cancelShouldBeIdempotent() { CountDownLatch task1Latch = new CountDownLatch(1); @@ -122,7 +135,7 @@ public class MemoryTaskManagerTest { await(task1Latch); return Task.Result.COMPLETED; }); - + sleep(TIME_TO_ENQUEUE); memoryTaskManager.cancel(id); assertThatCode(() -> memoryTaskManager.cancel(id)) .doesNotThrowAnyException(); @@ -146,13 +159,11 @@ public class MemoryTaskManagerTest { @Test public void getStatusShouldReturnCompletedWhenRunSuccessfully() throws Exception { - CountDownLatch latch = new CountDownLatch(1); TaskId taskId = memoryTaskManager.submit( - () -> Task.Result.COMPLETED, - countDownCallback(latch)); + () -> Task.Result.COMPLETED); - latch.await(); + sleep(500); assertThat(memoryTaskManager.getExecutionDetails(taskId).getStatus()) .isEqualTo(TaskManager.Status.COMPLETED); @@ -160,13 +171,11 @@ public class MemoryTaskManagerTest { @Test public void getStatusShouldReturnFailedWhenRunPartially() throws Exception { - CountDownLatch latch = new CountDownLatch(1); TaskId taskId = memoryTaskManager.submit( - () -> Task.Result.PARTIAL, - countDownCallback(latch)); + () -> Task.Result.PARTIAL); - latch.await(); + sleep(TIME_TO_ENQUEUE); assertThat(memoryTaskManager.getExecutionDetails(taskId).getStatus()) .isEqualTo(TaskManager.Status.FAILED); @@ -213,6 +222,7 @@ public class MemoryTaskManagerTest { .isEqualTo(TaskManager.Status.COMPLETED); softly.assertThat(entryWithId(list, inProgressId)) .isEqualTo(TaskManager.Status.IN_PROGRESS); + latch3.countDown(); } private TaskManager.Status entryWithId(List<TaskExecutionDetails> list, TaskId taskId) { @@ -369,24 +379,42 @@ public class MemoryTaskManagerTest { } @Test + public void awaitShouldAwaitWaitingTask() { + CountDownLatch latch = new CountDownLatch(1); + memoryTaskManager.submit( + () -> { + await(latch); + return Task.Result.COMPLETED; + }); + latch.countDown(); + TaskId task2 = memoryTaskManager.submit( + () -> Task.Result.COMPLETED); + + assertThat(memoryTaskManager.await(task2).getStatus()).isEqualTo(TaskManager.Status.COMPLETED); + } + + @Test public void submittedTaskShouldExecuteSequentially() { ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>(); TaskId id1 = memoryTaskManager.submit(() -> { queue.add(1); - sleep(500); + sleep(200); queue.add(2); return Task.Result.COMPLETED; }); TaskId id2 = memoryTaskManager.submit(() -> { queue.add(3); - sleep(500); + sleep(200); queue.add(4); return Task.Result.COMPLETED; }); + sleep(TIME_TO_ENQUEUE); memoryTaskManager.await(id1); memoryTaskManager.await(id2); + awaitAtMostOneSecond.until(() -> queue.contains(4)); + assertThat(queue) .containsExactly(1, 2, 3, 4); } @@ -396,9 +424,8 @@ public class MemoryTaskManagerTest { TaskId taskId = memoryTaskManager.submit(() -> { throw new RuntimeException(); }); - + sleep(TIME_TO_ENQUEUE); TaskExecutionDetails executionDetails = memoryTaskManager.await(taskId); - assertThat(executionDetails.getStatus()) .isEqualTo(TaskManager.Status.FAILED); } @@ -408,7 +435,7 @@ public class MemoryTaskManagerTest { TaskId taskId = memoryTaskManager.submit(() -> { throw new RuntimeException(); }); - + sleep(TIME_TO_ENQUEUE); memoryTaskManager.await(taskId); assertThat(memoryTaskManager.getExecutionDetails(taskId).getStatus()) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
