This is an automated email from the ASF dual-hosted git repository. rouazana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit e1e3e5ddd60f25310d0887092b83f92fbb3c59ce Author: Matthieu Baechler <[email protected]> AuthorDate: Wed Oct 9 11:13:19 2019 +0200 JAMES-2813 worker now calls Listener.updated once per second --- .../james/task/MemoryReferenceWithCounterTask.java | 8 ++++ .../org/apache/james/task/MemoryTaskManager.java | 6 +++ .../apache/james/task/SerialTaskManagerWorker.java | 17 +++++++- .../org/apache/james/task/TaskManagerWorker.java | 2 + .../task/eventsourcing/WorkerStatusListener.scala | 2 + .../james/task/SerialTaskManagerWorkerTest.java | 49 ++++++++++++++++++++++ 6 files changed, 83 insertions(+), 1 deletion(-) diff --git a/server/task/task-api/src/test/java/org/apache/james/task/MemoryReferenceWithCounterTask.java b/server/task/task-api/src/test/java/org/apache/james/task/MemoryReferenceWithCounterTask.java index 0a43f4c..819737d 100644 --- a/server/task/task-api/src/test/java/org/apache/james/task/MemoryReferenceWithCounterTask.java +++ b/server/task/task-api/src/test/java/org/apache/james/task/MemoryReferenceWithCounterTask.java @@ -23,6 +23,7 @@ import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; import com.github.fge.lambdas.functions.ThrowingFunction; +import com.google.common.base.MoreObjects; /** * This class is used for unit testing. @@ -62,6 +63,13 @@ public class MemoryReferenceWithCounterTask implements Task { public int hashCode() { return Objects.hashCode(this.count); } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("count", count) + .toString(); + } } private final ThrowingFunction<AtomicLong, Result> task; diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java b/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java index bffc24e..56ffd2d 100644 --- a/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java +++ b/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java @@ -85,6 +85,12 @@ public class MemoryTaskManager implements TaskManager { updaterFactory.apply(taskId) .accept(details -> details.cancelEffectively(additionalInformation)); } + + @Override + public void updated(TaskId taskId, TaskExecutionDetails.AdditionalInformation additionalInformation) { + //The memory task manager doesn't use polling to update its additionalInformation. + throw new IllegalStateException(); + } } private static final Duration AWAIT_POLLING_DURATION = Duration.ofMillis(500); diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java index c7ac921..b954aa2 100644 --- a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java +++ b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java @@ -19,6 +19,7 @@ package org.apache.james.task; import java.io.IOException; +import java.time.Duration; import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; @@ -37,6 +38,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Sets; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; import reactor.util.function.Tuples; @@ -86,6 +89,9 @@ public class SerialTaskManagerWorker implements TaskManagerWorker { CompletableFuture<Task.Result> future = CompletableFuture.supplyAsync(() -> runWithMdc(taskWithId, listener), taskExecutor); runningTask.set(Tuples.of(taskWithId.getId(), future)); + Disposable informationPolling = pollAdditionalInformation(taskWithId) + .doOnNext(information -> listener.updated(taskWithId.getId(), information)) + .subscribe(); return Mono.fromFuture(future) .doOnError(exception -> { if (exception instanceof CancellationException) { @@ -94,7 +100,8 @@ public class SerialTaskManagerWorker implements TaskManagerWorker { listener.failed(taskWithId.getId(), taskWithId.getTask().details(), exception); } }) - .onErrorReturn(Task.Result.PARTIAL); + .onErrorReturn(Task.Result.PARTIAL) + .doOnTerminate(informationPolling::dispose); } else { listener.cancelled(taskWithId.getId(), taskWithId.getTask().details()); return Mono.empty(); @@ -102,6 +109,14 @@ public class SerialTaskManagerWorker implements TaskManagerWorker { }; } + private Flux<TaskExecutionDetails.AdditionalInformation> pollAdditionalInformation(TaskWithId taskWithId) { + return Mono.fromCallable(() -> taskWithId.getTask().details()) + .delayElement(Duration.ofSeconds(1)) + .repeat() + .flatMap(Mono::justOrEmpty); + } + + private Task.Result runWithMdc(TaskWithId taskWithId, Listener listener) { return MDCBuilder.withMdc( MDCBuilder.create() diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/TaskManagerWorker.java b/server/task/task-memory/src/main/java/org/apache/james/task/TaskManagerWorker.java index d24206b..c6475e0 100644 --- a/server/task/task-memory/src/main/java/org/apache/james/task/TaskManagerWorker.java +++ b/server/task/task-memory/src/main/java/org/apache/james/task/TaskManagerWorker.java @@ -37,6 +37,8 @@ public interface TaskManagerWorker extends Closeable { void failed(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation); void cancelled(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation); + + void updated(TaskId taskId, TaskExecutionDetails.AdditionalInformation additionalInformation); } Mono<Task.Result> executeTask(TaskWithId taskWithId); diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala index b184c39..b6a3f80 100644 --- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala +++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala @@ -48,4 +48,6 @@ case class WorkerStatusListener(eventSourcingSystem: EventSourcingSystem) extend override def cancelled(taskId: TaskId, additionalInformation: Optional[TaskExecutionDetails.AdditionalInformation]): Unit = eventSourcingSystem.dispatch(Cancel(taskId, additionalInformation.asScala )) + + override def updated(taskId: TaskId, additionalInformation: TaskExecutionDetails.AdditionalInformation): Unit = ??? } \ No newline at end of file diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java index 527d1d2..7448a20 100644 --- a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java +++ b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java @@ -21,8 +21,11 @@ package org.apache.james.task; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.notNull; import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.atMost; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -30,6 +33,7 @@ import java.io.IOException; import java.time.Duration; import java.util.Optional; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.awaitility.Awaitility; @@ -70,6 +74,51 @@ class SerialTaskManagerWorkerTest { } @Test + void aRunningTaskShouldProvideInformationUpdatesDuringExecution() throws InterruptedException { + TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), new MemoryReferenceWithCounterTask((counter) -> + Mono.fromCallable(counter::incrementAndGet) + .delayElement(Duration.ofSeconds(2)) + .repeat(10) + .then(Mono.just(Task.Result.COMPLETED)) + .block())); + + worker.executeTask(taskWithId).subscribe(); + + TimeUnit.SECONDS.sleep(2); + + verify(listener, atLeastOnce()).updated(eq(taskWithId.getId()), notNull()); + } + + @Test + void aRunningTaskShouldHaveAFiniteNumberOfInformation() throws InterruptedException { + TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), new MemoryReferenceWithCounterTask((counter) -> + Mono.fromCallable(counter::incrementAndGet) + .delayElement(Duration.ofSeconds(1)) + .repeat(2) + .then(Mono.just(Task.Result.COMPLETED)) + .block())); + + worker.executeTask(taskWithId).block(); + + verify(listener, atMost(2)).updated(eq(taskWithId.getId()), notNull()); + } + + + @Test + void aRunningTaskShouldEmitAtMostOneInformationPerSecond() { + TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), new MemoryReferenceWithCounterTask((counter) -> + Mono.fromCallable(counter::incrementAndGet) + .delayElement(Duration.ofMillis(10)) + .repeat(200) + .then(Mono.just(Task.Result.COMPLETED)) + .block())); + + worker.executeTask(taskWithId).block(); + + verify(listener, times(2)).updated(eq(taskWithId.getId()), notNull()); + } + + @Test void aFailedTaskShouldCompleteWithFailedStatus() { TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), failedTask); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
