This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit e1e3e5ddd60f25310d0887092b83f92fbb3c59ce
Author: Matthieu Baechler <[email protected]>
AuthorDate: Wed Oct 9 11:13:19 2019 +0200

    JAMES-2813 worker now calls Listener.updated once per second
---
 .../james/task/MemoryReferenceWithCounterTask.java |  8 ++++
 .../org/apache/james/task/MemoryTaskManager.java   |  6 +++
 .../apache/james/task/SerialTaskManagerWorker.java | 17 +++++++-
 .../org/apache/james/task/TaskManagerWorker.java   |  2 +
 .../task/eventsourcing/WorkerStatusListener.scala  |  2 +
 .../james/task/SerialTaskManagerWorkerTest.java    | 49 ++++++++++++++++++++++
 6 files changed, 83 insertions(+), 1 deletion(-)

diff --git 
a/server/task/task-api/src/test/java/org/apache/james/task/MemoryReferenceWithCounterTask.java
 
b/server/task/task-api/src/test/java/org/apache/james/task/MemoryReferenceWithCounterTask.java
index 0a43f4c..819737d 100644
--- 
a/server/task/task-api/src/test/java/org/apache/james/task/MemoryReferenceWithCounterTask.java
+++ 
b/server/task/task-api/src/test/java/org/apache/james/task/MemoryReferenceWithCounterTask.java
@@ -23,6 +23,7 @@ import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.github.fge.lambdas.functions.ThrowingFunction;
+import com.google.common.base.MoreObjects;
 
 /**
  * This class is used for unit testing.
@@ -62,6 +63,13 @@ public class MemoryReferenceWithCounterTask implements Task {
         public int hashCode() {
             return Objects.hashCode(this.count);
         }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(this)
+                .add("count", count)
+                .toString();
+        }
     }
 
     private final ThrowingFunction<AtomicLong, Result> task;
diff --git 
a/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java
 
b/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java
index bffc24e..56ffd2d 100644
--- 
a/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java
+++ 
b/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java
@@ -85,6 +85,12 @@ public class MemoryTaskManager implements TaskManager {
             updaterFactory.apply(taskId)
                 .accept(details -> 
details.cancelEffectively(additionalInformation));
         }
+
+        @Override
+        public void updated(TaskId taskId, 
TaskExecutionDetails.AdditionalInformation additionalInformation) {
+            //The memory task manager doesn't use polling to update its 
additionalInformation.
+            throw new IllegalStateException();
+        }
     }
 
     private static final Duration AWAIT_POLLING_DURATION = 
Duration.ofMillis(500);
diff --git 
a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
 
b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
index c7ac921..b954aa2 100644
--- 
a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
+++ 
b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
@@ -19,6 +19,7 @@
 package org.apache.james.task;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
@@ -37,6 +38,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Sets;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.util.function.Tuple2;
 import reactor.util.function.Tuples;
@@ -86,6 +89,9 @@ public class SerialTaskManagerWorker implements 
TaskManagerWorker {
                 CompletableFuture<Task.Result> future = 
CompletableFuture.supplyAsync(() -> runWithMdc(taskWithId, listener), 
taskExecutor);
                 runningTask.set(Tuples.of(taskWithId.getId(), future));
 
+                Disposable informationPolling = 
pollAdditionalInformation(taskWithId)
+                    .doOnNext(information -> 
listener.updated(taskWithId.getId(), information))
+                    .subscribe();
                 return Mono.fromFuture(future)
                         .doOnError(exception -> {
                             if (exception instanceof CancellationException) {
@@ -94,7 +100,8 @@ public class SerialTaskManagerWorker implements 
TaskManagerWorker {
                                 listener.failed(taskWithId.getId(), 
taskWithId.getTask().details(), exception);
                             }
                         })
-                        .onErrorReturn(Task.Result.PARTIAL);
+                        .onErrorReturn(Task.Result.PARTIAL)
+                        .doOnTerminate(informationPolling::dispose);
             } else {
                 listener.cancelled(taskWithId.getId(), 
taskWithId.getTask().details());
                 return Mono.empty();
@@ -102,6 +109,14 @@ public class SerialTaskManagerWorker implements 
TaskManagerWorker {
         };
     }
 
+    private Flux<TaskExecutionDetails.AdditionalInformation> 
pollAdditionalInformation(TaskWithId taskWithId) {
+        return Mono.fromCallable(() -> taskWithId.getTask().details())
+            .delayElement(Duration.ofSeconds(1))
+            .repeat()
+            .flatMap(Mono::justOrEmpty);
+    }
+
+
     private Task.Result runWithMdc(TaskWithId taskWithId, Listener listener) {
         return MDCBuilder.withMdc(
             MDCBuilder.create()
diff --git 
a/server/task/task-memory/src/main/java/org/apache/james/task/TaskManagerWorker.java
 
b/server/task/task-memory/src/main/java/org/apache/james/task/TaskManagerWorker.java
index d24206b..c6475e0 100644
--- 
a/server/task/task-memory/src/main/java/org/apache/james/task/TaskManagerWorker.java
+++ 
b/server/task/task-memory/src/main/java/org/apache/james/task/TaskManagerWorker.java
@@ -37,6 +37,8 @@ public interface TaskManagerWorker extends Closeable {
         void failed(TaskId taskId, 
Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation);
 
         void cancelled(TaskId taskId, 
Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation);
+
+        void updated(TaskId taskId, TaskExecutionDetails.AdditionalInformation 
additionalInformation);
     }
 
     Mono<Task.Result> executeTask(TaskWithId taskWithId);
diff --git 
a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala
 
b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala
index b184c39..b6a3f80 100644
--- 
a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala
+++ 
b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala
@@ -48,4 +48,6 @@ case class WorkerStatusListener(eventSourcingSystem: 
EventSourcingSystem) extend
 
   override def cancelled(taskId: TaskId, additionalInformation: 
Optional[TaskExecutionDetails.AdditionalInformation]): Unit =
     eventSourcingSystem.dispatch(Cancel(taskId, additionalInformation.asScala 
))
+
+  override def updated(taskId: TaskId, additionalInformation: 
TaskExecutionDetails.AdditionalInformation): Unit = ???
 }
\ No newline at end of file
diff --git 
a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
 
b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
index 527d1d2..7448a20 100644
--- 
a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
+++ 
b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
@@ -21,8 +21,11 @@ package org.apache.james.task;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.notNull;
 import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.atMost;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 
@@ -30,6 +33,7 @@ import java.io.IOException;
 import java.time.Duration;
 import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.awaitility.Awaitility;
@@ -70,6 +74,51 @@ class SerialTaskManagerWorkerTest {
     }
 
     @Test
+    void aRunningTaskShouldProvideInformationUpdatesDuringExecution() throws 
InterruptedException {
+        TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), new 
MemoryReferenceWithCounterTask((counter) ->
+            Mono.fromCallable(counter::incrementAndGet)
+                .delayElement(Duration.ofSeconds(2))
+                .repeat(10)
+                .then(Mono.just(Task.Result.COMPLETED))
+                .block()));
+
+        worker.executeTask(taskWithId).subscribe();
+
+        TimeUnit.SECONDS.sleep(2);
+
+        verify(listener, atLeastOnce()).updated(eq(taskWithId.getId()), 
notNull());
+    }
+
+    @Test
+    void aRunningTaskShouldHaveAFiniteNumberOfInformation() throws 
InterruptedException {
+        TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), new 
MemoryReferenceWithCounterTask((counter) ->
+            Mono.fromCallable(counter::incrementAndGet)
+                .delayElement(Duration.ofSeconds(1))
+                .repeat(2)
+                .then(Mono.just(Task.Result.COMPLETED))
+                .block()));
+
+        worker.executeTask(taskWithId).block();
+
+        verify(listener, atMost(2)).updated(eq(taskWithId.getId()), notNull());
+    }
+
+
+    @Test
+    void aRunningTaskShouldEmitAtMostOneInformationPerSecond() {
+        TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), new 
MemoryReferenceWithCounterTask((counter) ->
+            Mono.fromCallable(counter::incrementAndGet)
+                .delayElement(Duration.ofMillis(10))
+                .repeat(200)
+                .then(Mono.just(Task.Result.COMPLETED))
+                .block()));
+
+        worker.executeTask(taskWithId).block();
+
+        verify(listener, times(2)).updated(eq(taskWithId.getId()), notNull());
+    }
+
+    @Test
     void aFailedTaskShouldCompleteWithFailedStatus() {
         TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), 
failedTask);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to