This is an automated email from the ASF dual-hosted git repository. rouazana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 3adf8bcb65142cfda99e41f5d80fe123d97189cb Author: Rémi Kowalski <[email protected]> AuthorDate: Wed May 22 14:12:27 2019 +0200 JAMES-2777 add task requested for cancellation status --- .../james/webadmin/routes/TasksRoutesTest.java | 20 ++++++- .../org/apache/james/task/MemoryTaskManager.java | 34 ++++++++--- .../apache/james/task/MemoryTaskManagerWorker.java | 56 +++++++++++++++--- .../apache/james/task/TaskExecutionDetails.java | 16 ++++- .../java/org/apache/james/task/TaskManager.java | 1 + .../apache/james/task/MemoryTaskManagerTest.java | 68 +++++++++++++++++++++- .../james/task/MemoryTaskManagerWorkerTest.java | 47 ++++++++++++++- 7 files changed, 218 insertions(+), 24 deletions(-) diff --git a/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/routes/TasksRoutesTest.java b/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/routes/TasksRoutesTest.java index b65a84f..825a41a 100644 --- a/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/routes/TasksRoutesTest.java +++ b/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/routes/TasksRoutesTest.java @@ -26,6 +26,8 @@ import static org.apache.james.webadmin.WebAdminServer.NO_CONFIGURATION; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.isIn; +import static org.hamcrest.Matchers.isOneOf; import static org.hamcrest.Matchers.not; import java.util.UUID; @@ -204,8 +206,14 @@ public class TasksRoutesTest { @Test public void deleteShouldCancelMatchingTask() { + CountDownLatch inProgressLatch = new CountDownLatch(1); + TaskId taskId = taskManager.submit(() -> { - await(); + try { + inProgressLatch.await(); + } catch (InterruptedException e) { + //ignore + } return Task.Result.COMPLETED; }); @@ -216,7 +224,17 @@ public class TasksRoutesTest { .get("/" + taskId.getValue()) .then() .statusCode(HttpStatus.OK_200) + .body("status", isOneOf("canceledRequested", "canceled")); + + inProgressLatch.countDown(); + when() + .get("/" + taskId.getValue()) + .then() + .statusCode(HttpStatus.OK_200) .body("status", is("canceled")); + + + } @Test diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java index 1623858..566ce32 100644 --- a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java +++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java @@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Consumer; +import java.util.function.Predicate; import javax.annotation.PreDestroy; @@ -60,8 +61,24 @@ public class MemoryTaskManager implements TaskManager { worker = new MemoryTaskManagerWorker(); workQueue .subscribeOn(Schedulers.single()) - .filter(task -> !listIds(Status.CANCELLED).contains(task.getId())) - .subscribe(this::treatTask); + .filter(isTaskWaiting().or(isTaskRequestedForCancellation())) + .subscribe(this::sendTaskToWorker); + } + + private void sendTaskToWorker(TaskWithId taskWithId) { + if (isTaskWaiting().test(taskWithId)) { + treatTask(taskWithId); + } else if (isTaskRequestedForCancellation().test(taskWithId)) { + updateDetails(taskWithId.getId()).accept(TaskExecutionDetails::cancelEffectively); + } + } + + private Predicate<TaskWithId> isTaskWaiting() { + return task -> listIds(Status.WAITING).contains(task.getId()); + } + + private Predicate<TaskWithId> isTaskRequestedForCancellation() { + return task -> listIds(Status.CANCEL_REQUESTED).contains(task.getId()); } private void treatTask(TaskWithId task) { @@ -111,15 +128,18 @@ public class MemoryTaskManager implements TaskManager { @Override public void cancel(TaskId id) { - TaskExecutionDetails details = getExecutionDetails(id); - if (details.getStatus().equals(Status.IN_PROGRESS) || details.getStatus().equals(Status.WAITING)) { - worker.cancelTask(id, updateDetails(id)); - } + Optional.ofNullable(idToExecutionDetails.get(id)).ifPresent(details -> { + if (details.getStatus().equals(Status.WAITING)) { + updateDetails(id).accept(currentDetails -> currentDetails.cancelRequested()); + } + worker.cancelTask(id, updateDetails(id)); + } + ); } @Override public TaskExecutionDetails await(TaskId id) { - if (Optional.ofNullable(getExecutionDetails(id)).isPresent()) { + if (Optional.ofNullable(idToExecutionDetails.get(id)).isPresent()) { return Flux.interval(NOW, AWAIT_POLLING_DURATION, Schedulers.elastic()) .filter(ignore -> tasksResult.get(id) != null) .map(ignore -> { diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java index 6feb85a..e69e641 100644 --- a/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java +++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java @@ -18,21 +18,27 @@ ****************************************************************/ package org.apache.james.task; +import java.time.Duration; import java.util.Optional; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiConsumer; import java.util.function.Consumer; +import org.apache.james.util.FunctionalUtils; import org.apache.james.util.MDCBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class MemoryTaskManagerWorker implements TaskManagerWorker { private static final boolean INTERRUPT_IF_RUNNING = true; private static final Logger LOGGER = LoggerFactory.getLogger(MemoryTaskManagerWorker.class); + public static final Duration CHECK_CANCELED_PERIOD = Duration.ofMillis(100); + public static final int FIRST = 1; private final ConcurrentHashMap<TaskId, CompletableFuture<Task.Result>> idToFuture = new ConcurrentHashMap<>(); @Override @@ -42,8 +48,12 @@ public class MemoryTaskManagerWorker implements TaskManagerWorker { idToFuture.put(taskWithId.getId(), futureResult); Mono<Task.Result> result = Mono.<Task.Result>fromFuture(futureResult) - .doOnError(res -> failed(updateDetails, - (logger, details) -> logger.error("Task was partially performed. Check logs for more details"))) + .doOnError(res -> { + if (!(res instanceof CancellationException)) { + failed(updateDetails, + (logger, details) -> logger.error("Task was partially performed. Check logs for more details")); + } + }) .doOnTerminate(() -> idToFuture.remove(taskWithId.getId())); return result; @@ -77,16 +87,44 @@ public class MemoryTaskManagerWorker implements TaskManagerWorker { public void cancelTask(TaskId id, Consumer<TaskExecutionDetailsUpdater> updateDetails) { Optional.ofNullable(idToFuture.remove(id)) .ifPresent(future -> { - updateDetails.accept(details -> { - if (details.getStatus().equals(TaskManager.Status.WAITING) || details.getStatus().equals(TaskManager.Status.IN_PROGRESS)) { - return details.cancel(); - } - return details; - }); - future.cancel(INTERRUPT_IF_RUNNING); + requestCancellation(updateDetails, future); + waitUntilFutureIsCancelled(future) + .subscribe(cancellationSuccessful -> effectivelyCancelled(updateDetails)); }); } + private void requestCancellation(Consumer<TaskExecutionDetailsUpdater> updateDetails, CompletableFuture<Task.Result> future) { + updateDetails.accept(details -> { + if (details.getStatus().equals(TaskManager.Status.WAITING) || details.getStatus().equals(TaskManager.Status.IN_PROGRESS)) { + return details.cancelRequested(); + } + return details; + }); + future.cancel(INTERRUPT_IF_RUNNING); + } + + private Flux<Boolean> waitUntilFutureIsCancelled(CompletableFuture<Task.Result> future) { + return Flux.interval(CHECK_CANCELED_PERIOD) + .map(ignore -> future.isCancelled()) + .filter(FunctionalUtils.identityPredicate()) + .take(FIRST); + } + + private void effectivelyCancelled(Consumer<TaskExecutionDetailsUpdater> updateDetails) { + updateDetails.accept(details -> { + if (canBeCancelledEffectively(details)) { + return details.cancelEffectively(); + } + return details; + }); + } + + private boolean canBeCancelledEffectively(TaskExecutionDetails details) { + return details.getStatus().equals(TaskManager.Status.WAITING) + || details.getStatus().equals(TaskManager.Status.IN_PROGRESS) + || details.getStatus().equals(TaskManager.Status.CANCEL_REQUESTED); + } + private void success(Consumer<TaskExecutionDetailsUpdater> updateDetails) { updateDetails.accept(currentDetails -> { if (!wasCancelled(currentDetails)) { diff --git a/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java b/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java index a95627c..06b9ecf 100644 --- a/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java +++ b/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java @@ -140,12 +140,26 @@ public class TaskExecutionDetails { Optional.of(ZonedDateTime.now())); } - public TaskExecutionDetails cancel() { + public TaskExecutionDetails cancelRequested() { Preconditions.checkState(status == TaskManager.Status.IN_PROGRESS || status == TaskManager.Status.WAITING); return new TaskExecutionDetails( taskId, task, + TaskManager.Status.CANCEL_REQUESTED, + submitDate, + startedDate, + Optional.empty(), + Optional.of(ZonedDateTime.now()), + Optional.empty()); + } + + public TaskExecutionDetails cancelEffectively() { + Preconditions.checkState(status == TaskManager.Status.IN_PROGRESS + || status == TaskManager.Status.WAITING || status == TaskManager.Status.CANCEL_REQUESTED); + return new TaskExecutionDetails( + taskId, + task, TaskManager.Status.CANCELLED, submitDate, startedDate, diff --git a/server/task/src/main/java/org/apache/james/task/TaskManager.java b/server/task/src/main/java/org/apache/james/task/TaskManager.java index a3fabc3..de3e16b 100644 --- a/server/task/src/main/java/org/apache/james/task/TaskManager.java +++ b/server/task/src/main/java/org/apache/james/task/TaskManager.java @@ -27,6 +27,7 @@ public interface TaskManager { WAITING("waiting"), IN_PROGRESS("inProgress"), COMPLETED("completed"), + CANCEL_REQUESTED("canceledRequested"), CANCELLED("canceled"), FAILED("failed"); diff --git a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java index c857ec1..676071f 100644 --- a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java +++ b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java @@ -108,6 +108,35 @@ public class MemoryTaskManagerTest { } @Test + public void completedTaskShouldNotBeCancelled() { + TaskId id = memoryTaskManager.submit(() -> Task.Result.COMPLETED); + + awaitUntilTaskHasStatus(id, TaskManager.Status.COMPLETED); + memoryTaskManager.cancel(id); + + try { + awaitUntilTaskHasStatus(id, TaskManager.Status.CANCELLED); + } catch (Exception e) { + //Should timeout + } + assertThat(memoryTaskManager.getExecutionDetails(id).getStatus()).isEqualTo(TaskManager.Status.COMPLETED); + } + @Test + public void failedTaskShouldNotBeCancelled() { + TaskId id = memoryTaskManager.submit(() -> Task.Result.PARTIAL); + + awaitUntilTaskHasStatus(id, TaskManager.Status.FAILED); + memoryTaskManager.cancel(id); + + try { + awaitUntilTaskHasStatus(id, TaskManager.Status.CANCELLED); + } catch (Exception e) { + //Should timeout + } + assertThat(memoryTaskManager.getExecutionDetails(id).getStatus()).isEqualTo(TaskManager.Status.FAILED); + } + + @Test public void getStatusShouldBeCancelledWhenCancelled() { TaskId id = memoryTaskManager.submit(() -> { sleep(500); @@ -118,7 +147,35 @@ public class MemoryTaskManagerTest { memoryTaskManager.cancel(id); assertThat(memoryTaskManager.getExecutionDetails(id).getStatus()) + .isIn(TaskManager.Status.CANCELLED, TaskManager.Status.CANCEL_REQUESTED); + + awaitUntilTaskHasStatus(id, TaskManager.Status.CANCELLED); + assertThat(memoryTaskManager.getExecutionDetails(id).getStatus()) .isEqualTo(TaskManager.Status.CANCELLED); + + } + + @Test + public void aWaitingTaskShouldBeCancelled() { + TaskId id = memoryTaskManager.submit(() -> { + sleep(500); + return Task.Result.COMPLETED; + }); + + TaskId idTaskToCancel = memoryTaskManager.submit(() -> Task.Result.COMPLETED); + + memoryTaskManager.cancel(idTaskToCancel); + + awaitUntilTaskHasStatus(id, TaskManager.Status.IN_PROGRESS); + + + assertThat(memoryTaskManager.getExecutionDetails(idTaskToCancel).getStatus()) + .isIn(TaskManager.Status.CANCELLED, TaskManager.Status.CANCEL_REQUESTED); + + awaitUntilTaskHasStatus(idTaskToCancel, TaskManager.Status.CANCELLED); + assertThat(memoryTaskManager.getExecutionDetails(idTaskToCancel).getStatus()) + .isEqualTo(TaskManager.Status.CANCELLED); + } @Test @@ -345,17 +402,22 @@ public class MemoryTaskManagerTest { } @Test - public void listShouldBeEmptyWhenNoTasks() throws Exception { + public void listShouldBeEmptyWhenNoTasks() { assertThat(memoryTaskManager.list()).isEmpty(); } @Test - public void listCancelledShouldBeEmptyWhenNoTasks() throws Exception { + public void listCancelledShouldBeEmptyWhenNoTasks() { assertThat(memoryTaskManager.list(TaskManager.Status.CANCELLED)).isEmpty(); } @Test - public void awaitShouldNotThrowWhenCompletedTask() throws Exception { + public void listCancelRequestedShouldBeEmptyWhenNoTasks() { + assertThat(memoryTaskManager.list(TaskManager.Status.CANCEL_REQUESTED)).isEmpty(); + } + + @Test + public void awaitShouldNotThrowWhenCompletedTask() { TaskId taskId = memoryTaskManager.submit( () -> Task.Result.COMPLETED); memoryTaskManager.await(taskId); diff --git a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerWorkerTest.java b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerWorkerTest.java index e4ae643..10c4d6a 100644 --- a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerWorkerTest.java +++ b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerWorkerTest.java @@ -19,12 +19,17 @@ package org.apache.james.task; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Duration.ONE_HUNDRED_MILLISECONDS; +import static org.awaitility.Duration.ONE_SECOND; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; +import org.awaitility.Awaitility; +import org.awaitility.Duration; +import org.awaitility.core.ConditionFactory; import org.junit.Test; import reactor.core.publisher.Mono; @@ -33,6 +38,15 @@ public class MemoryTaskManagerWorkerTest { private final MemoryTaskManagerWorker worker = new MemoryTaskManagerWorker(); + private final Duration slowPacedPollInterval = ONE_HUNDRED_MILLISECONDS; + private final ConditionFactory calmlyAwait = Awaitility.with() + .pollInterval(slowPacedPollInterval) + .and() + .with() + .pollDelay(slowPacedPollInterval) + .await(); + private final ConditionFactory awaitAtMostOneSecond = calmlyAwait.atMost(ONE_SECOND); + private final Task successfulTask = () -> Task.Result.COMPLETED; private final Task failedTask = () -> Task.Result.PARTIAL; private final Task throwingTask = () -> { @@ -76,6 +90,27 @@ public class MemoryTaskManagerWorkerTest { assertThat(idToDetails.get(id).getStatus()).isEqualTo(TaskManager.Status.IN_PROGRESS); latch.countDown(); } + @Test + public void theWorkerShouldNotRunATaskRequestedForCancellation() { + TaskId id = TaskId.generateTaskId(); + AtomicInteger counter = new AtomicInteger(0); + + Task task = () -> { + counter.incrementAndGet(); + return Task.Result.COMPLETED; + }; + + TaskWithId taskWithId = new TaskWithId(id, task); + TaskExecutionDetails executionDetails = TaskExecutionDetails.from(task, id); + ConcurrentHashMap<TaskId, TaskExecutionDetails> idToDetails = new ConcurrentHashMap<>(); + + idToDetails.put(id, executionDetails.cancelRequested()); + + worker.executeTask(taskWithId, updateDetails(idToDetails, id)).subscribe(); + + assertThat(counter.get()).isEqualTo(0); + assertThat(idToDetails.get(id).getStatus()).isEqualTo(TaskManager.Status.CANCEL_REQUESTED); + } @Test public void theWorkerShouldNotRunACancelledTask() { @@ -91,9 +126,9 @@ public class MemoryTaskManagerWorkerTest { TaskExecutionDetails executionDetails = TaskExecutionDetails.from(task, id); ConcurrentHashMap<TaskId, TaskExecutionDetails> idToDetails = new ConcurrentHashMap<>(); - idToDetails.put(id, executionDetails.cancel()); + idToDetails.put(id, executionDetails.cancelEffectively()); - worker.executeTask(taskWithId, updateDetails(idToDetails, id)).cache(); + worker.executeTask(taskWithId, updateDetails(idToDetails, id)).subscribe(); assertThat(counter.get()).isEqualTo(0); assertThat(idToDetails.get(id).getStatus()).isEqualTo(TaskManager.Status.CANCELLED); @@ -119,9 +154,11 @@ public class MemoryTaskManagerWorkerTest { worker.executeTask(taskWithId, updateDetails(idToDetails, id)).cache(); worker.cancelTask(id, updateDetails(idToDetails, id)); - assertThat(idToDetails.get(id).getStatus()).isEqualTo(TaskManager.Status.CANCELLED); + assertThat(idToDetails.get(id).getStatus()).isIn(TaskManager.Status.CANCELLED, TaskManager.Status.CANCEL_REQUESTED); assertThat(counter.get()).isEqualTo(0); + + awaitUntilTaskHasStatus(idToDetails, id, TaskManager.Status.CANCELLED); assertThat(idToDetails.get(id).getStatus()).isEqualTo(TaskManager.Status.CANCELLED); } @@ -153,6 +190,10 @@ public class MemoryTaskManagerWorkerTest { }; } + private void awaitUntilTaskHasStatus(ConcurrentHashMap<TaskId, TaskExecutionDetails> idToExecutionDetails, TaskId id, TaskManager.Status status) { + awaitAtMostOneSecond.until(() -> idToExecutionDetails.get(id).getStatus().equals(status)); + } + private void await(CountDownLatch countDownLatch) { try { countDownLatch.await(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
