This is an automated email from the ASF dual-hosted git repository. rouazana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit e1ff8c54a13a4609c1d42cae9c001da2e1ef36c0 Author: Rémi KOWALSKI <[email protected]> AuthorDate: Thu Oct 17 10:55:50 2019 +0200 JAMES-2813 change polling intervall and make it a parameter of the worker --- .../java/org/apache/james/task/TaskManagerContract.java | 2 +- .../distributed/RabbitMQWorkQueueSupplier.scala | 13 +++++++++++-- .../distributed/DistributedTaskManagerTest.java | 3 +-- .../main/java/org/apache/james/task/MemoryTaskManager.java | 3 ++- .../java/org/apache/james/task/SerialTaskManagerWorker.java | 6 ++++-- .../org/apache/james/task/SerialTaskManagerWorkerTest.java | 8 +++++--- .../task/eventsourcing/EventSourcingTaskManagerTest.java | 2 +- 7 files changed, 25 insertions(+), 12 deletions(-) diff --git a/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java b/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java index d482eed..406671f 100644 --- a/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java +++ b/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java @@ -38,7 +38,7 @@ import org.hamcrest.Matchers; import org.junit.jupiter.api.Test; public interface TaskManagerContract { - + java.time.Duration UPDATE_INFORMATION_POLLING_INTERVAL = java.time.Duration.ofSeconds(1); Duration slowPacedPollInterval = ONE_HUNDRED_MILLISECONDS; ConditionFactory calmlyAwait = Awaitility.with() .pollInterval(slowPacedPollInterval) diff --git a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala index a6c468c..f86be44 100644 --- a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala +++ b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala @@ -18,8 +18,10 @@ * ***************************************************************/ package org.apache.james.task.eventsourcing.distributed -import javax.inject.Inject +import java.time.Duration +import com.google.common.annotations.VisibleForTesting +import javax.inject.Inject import org.apache.james.backends.rabbitmq.SimpleConnectionPool import org.apache.james.eventsourcing.EventSourcingSystem import org.apache.james.server.task.json.JsonTaskSerializer @@ -28,9 +30,16 @@ import org.apache.james.task.eventsourcing.{WorkQueueSupplier, WorkerStatusListe class RabbitMQWorkQueueSupplier @Inject()(private val rabbitMQConnectionPool: SimpleConnectionPool, private val jsonTaskSerializer: JsonTaskSerializer) extends WorkQueueSupplier { + + val DEFAULT_ADDITIONAL_INFORMATION_POLLING_INTERVAL = Duration.ofSeconds(30) override def apply(eventSourcingSystem: EventSourcingSystem): RabbitMQWorkQueue = { + apply(eventSourcingSystem, DEFAULT_ADDITIONAL_INFORMATION_POLLING_INTERVAL) + } + + @VisibleForTesting + def apply(eventSourcingSystem: EventSourcingSystem, additionalInformationPollingInterval: Duration): RabbitMQWorkQueue = { val listener = WorkerStatusListener(eventSourcingSystem) - val worker = new SerialTaskManagerWorker(listener) + val worker = new SerialTaskManagerWorker(listener, additionalInformationPollingInterval) val rabbitMQWorkQueue = new RabbitMQWorkQueue(worker, rabbitMQConnectionPool, jsonTaskSerializer) rabbitMQWorkQueue.start() rabbitMQWorkQueue diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java index 6774b67..3f4be63 100644 --- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java +++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java @@ -68,7 +68,6 @@ import org.awaitility.Duration; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.RegisterExtension; import com.github.steveash.guavate.Guavate; @@ -86,7 +85,7 @@ class DistributedTaskManagerTest implements TaskManagerContract { @Override public WorkQueue apply(EventSourcingSystem eventSourcingSystem) { - RabbitMQWorkQueue workQueue = supplier.apply(eventSourcingSystem); + RabbitMQWorkQueue workQueue = supplier.apply(eventSourcingSystem, UPDATE_INFORMATION_POLLING_INTERVAL); workQueues.add(workQueue); return workQueue; } diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java b/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java index 56ffd2d..64f60e9 100644 --- a/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java +++ b/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java @@ -93,6 +93,7 @@ public class MemoryTaskManager implements TaskManager { } } + private static final Duration UPDATE_INFORMATION_POLLING_DURATION = Duration.ofSeconds(5); private static final Duration AWAIT_POLLING_DURATION = Duration.ofMillis(500); public static final Duration NOW = Duration.ZERO; @@ -105,7 +106,7 @@ public class MemoryTaskManager implements TaskManager { public MemoryTaskManager(Hostname hostname) { this.hostname = hostname; this.idToExecutionDetails = new ConcurrentHashMap<>(); - this.worker = new SerialTaskManagerWorker(updater()); + this.worker = new SerialTaskManagerWorker(updater(), UPDATE_INFORMATION_POLLING_DURATION); workQueue = new MemoryWorkQueue(worker); } diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java index c296bf1..34f4548 100644 --- a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java +++ b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java @@ -52,8 +52,10 @@ public class SerialTaskManagerWorker implements TaskManagerWorker { private final AtomicReference<Tuple2<TaskId, Future<?>>> runningTask; private final Semaphore semaphore; private final Set<TaskId> cancelledTasks; + private final Duration pollingInterval; - public SerialTaskManagerWorker(Listener listener) { + public SerialTaskManagerWorker(Listener listener, Duration pollingInterval) { + this.pollingInterval = pollingInterval; this.taskExecutor = Executors.newSingleThreadExecutor(NamedThreadFactory.withName("task executor")); this.listener = listener; this.cancelledTasks = Sets.newConcurrentHashSet(); @@ -112,7 +114,7 @@ public class SerialTaskManagerWorker implements TaskManagerWorker { private Flux<TaskExecutionDetails.AdditionalInformation> pollAdditionalInformation(TaskWithId taskWithId) { return Mono.fromCallable(() -> taskWithId.getTask().details()) - .delayElement(Duration.ofSeconds(1), Schedulers.boundedElastic()) + .delayElement(pollingInterval, Schedulers.boundedElastic()) .repeat() .flatMap(Mono::justOrEmpty) .doOnNext(information -> listener.updated(taskWithId.getId(), information)); diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java index b693347..a83392f 100644 --- a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java +++ b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java @@ -44,6 +44,8 @@ import org.junit.jupiter.api.Test; import reactor.core.publisher.Mono; class SerialTaskManagerWorkerTest { + private static final Duration UPDATE_INFORMATION_POLLING_DURATION = Duration.ofSeconds(1); + private TaskManagerWorker.Listener listener; private SerialTaskManagerWorker worker; @@ -54,7 +56,7 @@ class SerialTaskManagerWorkerTest { @BeforeEach void beforeEach() { listener = mock(TaskManagerWorker.Listener.class); - worker = new SerialTaskManagerWorker(listener); + worker = new SerialTaskManagerWorker(listener, UPDATE_INFORMATION_POLLING_DURATION); } @AfterEach @@ -94,13 +96,13 @@ class SerialTaskManagerWorkerTest { TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), new MemoryReferenceWithCounterTask((counter) -> Mono.fromCallable(counter::incrementAndGet) .delayElement(Duration.ofSeconds(1)) - .repeat(2) + .repeat(3) .then(Mono.just(Task.Result.COMPLETED)) .block())); worker.executeTask(taskWithId).block(); - verify(listener, atMost(3)).updated(eq(taskWithId.getId()), notNull()); + verify(listener, atMost(4)).updated(eq(taskWithId.getId()), notNull()); } @Test diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java index d3d9653..05ef373 100644 --- a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java +++ b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java @@ -58,7 +58,7 @@ class EventSourcingTaskManagerTest implements TaskManagerContract { TaskExecutionDetailsProjection executionDetailsProjection = new MemoryTaskExecutionDetailsProjection(); WorkQueueSupplier workQueueSupplier = eventSourcingSystem -> { WorkerStatusListener listener = new WorkerStatusListener(eventSourcingSystem); - TaskManagerWorker worker = new SerialTaskManagerWorker(listener); + TaskManagerWorker worker = new SerialTaskManagerWorker(listener, UPDATE_INFORMATION_POLLING_INTERVAL); return new MemoryWorkQueue(worker); }; taskManager = new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection, HOSTNAME, new MemoryTerminationSubscriber()); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
