This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit e1ff8c54a13a4609c1d42cae9c001da2e1ef36c0
Author: RĂ©mi KOWALSKI <[email protected]>
AuthorDate: Thu Oct 17 10:55:50 2019 +0200

    JAMES-2813 change polling intervall and make it a parameter of the worker
---
 .../java/org/apache/james/task/TaskManagerContract.java     |  2 +-
 .../distributed/RabbitMQWorkQueueSupplier.scala             | 13 +++++++++++--
 .../distributed/DistributedTaskManagerTest.java             |  3 +--
 .../main/java/org/apache/james/task/MemoryTaskManager.java  |  3 ++-
 .../java/org/apache/james/task/SerialTaskManagerWorker.java |  6 ++++--
 .../org/apache/james/task/SerialTaskManagerWorkerTest.java  |  8 +++++---
 .../task/eventsourcing/EventSourcingTaskManagerTest.java    |  2 +-
 7 files changed, 25 insertions(+), 12 deletions(-)

diff --git 
a/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java
 
b/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java
index d482eed..406671f 100644
--- 
a/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java
+++ 
b/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java
@@ -38,7 +38,7 @@ import org.hamcrest.Matchers;
 import org.junit.jupiter.api.Test;
 
 public interface TaskManagerContract {
-
+    java.time.Duration UPDATE_INFORMATION_POLLING_INTERVAL = 
java.time.Duration.ofSeconds(1);
     Duration slowPacedPollInterval = ONE_HUNDRED_MILLISECONDS;
     ConditionFactory calmlyAwait = Awaitility.with()
         .pollInterval(slowPacedPollInterval)
diff --git 
a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala
 
b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala
index a6c468c..f86be44 100644
--- 
a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala
+++ 
b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala
@@ -18,8 +18,10 @@
  * ***************************************************************/
 package org.apache.james.task.eventsourcing.distributed
 
-import javax.inject.Inject
+import java.time.Duration
 
+import com.google.common.annotations.VisibleForTesting
+import javax.inject.Inject
 import org.apache.james.backends.rabbitmq.SimpleConnectionPool
 import org.apache.james.eventsourcing.EventSourcingSystem
 import org.apache.james.server.task.json.JsonTaskSerializer
@@ -28,9 +30,16 @@ import 
org.apache.james.task.eventsourcing.{WorkQueueSupplier, WorkerStatusListe
 
 class RabbitMQWorkQueueSupplier @Inject()(private val rabbitMQConnectionPool: 
SimpleConnectionPool,
                                 private val jsonTaskSerializer: 
JsonTaskSerializer) extends WorkQueueSupplier {
+
+  val DEFAULT_ADDITIONAL_INFORMATION_POLLING_INTERVAL =  Duration.ofSeconds(30)
   override def apply(eventSourcingSystem: EventSourcingSystem): 
RabbitMQWorkQueue = {
+     apply(eventSourcingSystem, 
DEFAULT_ADDITIONAL_INFORMATION_POLLING_INTERVAL)
+  }
+
+  @VisibleForTesting
+  def apply(eventSourcingSystem: EventSourcingSystem, 
additionalInformationPollingInterval: Duration): RabbitMQWorkQueue = {
     val listener = WorkerStatusListener(eventSourcingSystem)
-    val worker = new SerialTaskManagerWorker(listener)
+    val worker = new SerialTaskManagerWorker(listener, 
additionalInformationPollingInterval)
     val rabbitMQWorkQueue = new RabbitMQWorkQueue(worker, 
rabbitMQConnectionPool, jsonTaskSerializer)
     rabbitMQWorkQueue.start()
     rabbitMQWorkQueue
diff --git 
a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
 
b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
index 6774b67..3f4be63 100644
--- 
a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
+++ 
b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
@@ -68,7 +68,6 @@ import org.awaitility.Duration;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
 import com.github.steveash.guavate.Guavate;
@@ -86,7 +85,7 @@ class DistributedTaskManagerTest implements 
TaskManagerContract {
 
         @Override
         public WorkQueue apply(EventSourcingSystem eventSourcingSystem) {
-            RabbitMQWorkQueue workQueue = supplier.apply(eventSourcingSystem);
+            RabbitMQWorkQueue workQueue = supplier.apply(eventSourcingSystem, 
UPDATE_INFORMATION_POLLING_INTERVAL);
             workQueues.add(workQueue);
             return workQueue;
         }
diff --git 
a/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java
 
b/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java
index 56ffd2d..64f60e9 100644
--- 
a/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java
+++ 
b/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java
@@ -93,6 +93,7 @@ public class MemoryTaskManager implements TaskManager {
         }
     }
 
+    private static final Duration UPDATE_INFORMATION_POLLING_DURATION = 
Duration.ofSeconds(5);
     private static final Duration AWAIT_POLLING_DURATION = 
Duration.ofMillis(500);
     public static final Duration NOW = Duration.ZERO;
 
@@ -105,7 +106,7 @@ public class MemoryTaskManager implements TaskManager {
     public MemoryTaskManager(Hostname hostname) {
         this.hostname = hostname;
         this.idToExecutionDetails = new ConcurrentHashMap<>();
-        this.worker = new SerialTaskManagerWorker(updater());
+        this.worker = new SerialTaskManagerWorker(updater(), 
UPDATE_INFORMATION_POLLING_DURATION);
         workQueue = new MemoryWorkQueue(worker);
     }
 
diff --git 
a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
 
b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
index c296bf1..34f4548 100644
--- 
a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
+++ 
b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
@@ -52,8 +52,10 @@ public class SerialTaskManagerWorker implements 
TaskManagerWorker {
     private final AtomicReference<Tuple2<TaskId, Future<?>>> runningTask;
     private final Semaphore semaphore;
     private final Set<TaskId> cancelledTasks;
+    private final Duration pollingInterval;
 
-    public SerialTaskManagerWorker(Listener listener) {
+    public SerialTaskManagerWorker(Listener listener, Duration 
pollingInterval) {
+        this.pollingInterval = pollingInterval;
         this.taskExecutor = 
Executors.newSingleThreadExecutor(NamedThreadFactory.withName("task executor"));
         this.listener = listener;
         this.cancelledTasks = Sets.newConcurrentHashSet();
@@ -112,7 +114,7 @@ public class SerialTaskManagerWorker implements 
TaskManagerWorker {
 
     private Flux<TaskExecutionDetails.AdditionalInformation> 
pollAdditionalInformation(TaskWithId taskWithId) {
         return Mono.fromCallable(() -> taskWithId.getTask().details())
-            .delayElement(Duration.ofSeconds(1), Schedulers.boundedElastic())
+            .delayElement(pollingInterval, Schedulers.boundedElastic())
             .repeat()
             .flatMap(Mono::justOrEmpty)
             .doOnNext(information -> listener.updated(taskWithId.getId(), 
information));
diff --git 
a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
 
b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
index b693347..a83392f 100644
--- 
a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
+++ 
b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
@@ -44,6 +44,8 @@ import org.junit.jupiter.api.Test;
 import reactor.core.publisher.Mono;
 
 class SerialTaskManagerWorkerTest {
+    private  static final Duration UPDATE_INFORMATION_POLLING_DURATION = 
Duration.ofSeconds(1);
+
     private TaskManagerWorker.Listener listener;
     private SerialTaskManagerWorker worker;
 
@@ -54,7 +56,7 @@ class SerialTaskManagerWorkerTest {
     @BeforeEach
     void beforeEach() {
         listener = mock(TaskManagerWorker.Listener.class);
-        worker = new SerialTaskManagerWorker(listener);
+        worker = new SerialTaskManagerWorker(listener, 
UPDATE_INFORMATION_POLLING_DURATION);
     }
 
     @AfterEach
@@ -94,13 +96,13 @@ class SerialTaskManagerWorkerTest {
         TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), new 
MemoryReferenceWithCounterTask((counter) ->
             Mono.fromCallable(counter::incrementAndGet)
                 .delayElement(Duration.ofSeconds(1))
-                .repeat(2)
+                .repeat(3)
                 .then(Mono.just(Task.Result.COMPLETED))
                 .block()));
 
         worker.executeTask(taskWithId).block();
 
-        verify(listener, atMost(3)).updated(eq(taskWithId.getId()), notNull());
+        verify(listener, atMost(4)).updated(eq(taskWithId.getId()), notNull());
     }
 
     @Test
diff --git 
a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
 
b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
index d3d9653..05ef373 100644
--- 
a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
+++ 
b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
@@ -58,7 +58,7 @@ class EventSourcingTaskManagerTest implements 
TaskManagerContract {
         TaskExecutionDetailsProjection executionDetailsProjection = new 
MemoryTaskExecutionDetailsProjection();
         WorkQueueSupplier workQueueSupplier = eventSourcingSystem -> {
             WorkerStatusListener listener = new 
WorkerStatusListener(eventSourcingSystem);
-            TaskManagerWorker worker = new SerialTaskManagerWorker(listener);
+            TaskManagerWorker worker = new SerialTaskManagerWorker(listener, 
UPDATE_INFORMATION_POLLING_INTERVAL);
             return new MemoryWorkQueue(worker);
         };
         taskManager = new EventSourcingTaskManager(workQueueSupplier, 
eventStore, executionDetailsProjection, HOSTNAME, new 
MemoryTerminationSubscriber());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to