This is an automated email from the ASF dual-hosted git repository. rouazana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 12559bdf8638c692950c97c115905d9c5b1983c4 Author: Matthieu Baechler <[email protected]> AuthorDate: Wed Oct 9 11:58:12 2019 +0200 JAMES-2813 put executeWithSemaphore building blocks into submethods --- .../apache/james/task/SerialTaskManagerWorker.java | 31 +++++++++++----------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java index b954aa2..8fb0e15 100644 --- a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java +++ b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java @@ -38,7 +38,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Sets; -import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; @@ -89,19 +88,12 @@ public class SerialTaskManagerWorker implements TaskManagerWorker { CompletableFuture<Task.Result> future = CompletableFuture.supplyAsync(() -> runWithMdc(taskWithId, listener), taskExecutor); runningTask.set(Tuples.of(taskWithId.getId(), future)); - Disposable informationPolling = pollAdditionalInformation(taskWithId) - .doOnNext(information -> listener.updated(taskWithId.getId(), information)) - .subscribe(); - return Mono.fromFuture(future) - .doOnError(exception -> { - if (exception instanceof CancellationException) { - listener.cancelled(taskWithId.getId(), taskWithId.getTask().details()); - } else { - listener.failed(taskWithId.getId(), taskWithId.getTask().details(), exception); - } - }) - .onErrorReturn(Task.Result.PARTIAL) - .doOnTerminate(informationPolling::dispose); + return Mono.using( + () -> pollAdditionalInformation(taskWithId).subscribe(), + ignored -> Mono.fromFuture(future) + .doOnError(exception -> handleExecutionError(taskWithId, listener, exception)) + .onErrorReturn(Task.Result.PARTIAL), + polling -> polling.dispose()); } else { listener.cancelled(taskWithId.getId(), taskWithId.getTask().details()); return Mono.empty(); @@ -109,11 +101,20 @@ public class SerialTaskManagerWorker implements TaskManagerWorker { }; } + private void handleExecutionError(TaskWithId taskWithId, Listener listener, Throwable exception) { + if (exception instanceof CancellationException) { + listener.cancelled(taskWithId.getId(), taskWithId.getTask().details()); + } else { + listener.failed(taskWithId.getId(), taskWithId.getTask().details(), exception); + } + } + private Flux<TaskExecutionDetails.AdditionalInformation> pollAdditionalInformation(TaskWithId taskWithId) { return Mono.fromCallable(() -> taskWithId.getTask().details()) .delayElement(Duration.ofSeconds(1)) .repeat() - .flatMap(Mono::justOrEmpty); + .flatMap(Mono::justOrEmpty) + .doOnNext(information -> listener.updated(taskWithId.getId(), taskWithId.getTask().type(), information)); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
