This is an automated email from the ASF dual-hosted git repository. rouazana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 0cd90e1752c0a384c9ff8e65590560454ee67e97 Author: Rémi Kowalski <[email protected]> AuthorDate: Wed Jul 31 17:14:20 2019 +0200 JAMES-2813 extract WorkQueue interface --- .../org/apache/james/task/MemoryTaskManager.java | 2 +- .../task/{WorkQueue.java => MemoryWorkQueue.java} | 13 +---- .../main/java/org/apache/james/task/WorkQueue.java | 62 ++-------------------- .../eventsourcing/EventSourcingTaskManager.scala | 6 ++- .../EventSourcingTaskManagerTest.java | 8 ++- 5 files changed, 17 insertions(+), 74 deletions(-) diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java index 2ac480e..ad7bd9d 100644 --- a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java +++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java @@ -81,7 +81,7 @@ public class MemoryTaskManager implements TaskManager { idToExecutionDetails = new ConcurrentHashMap<>(); worker = new SerialTaskManagerWorker(); - workQueue = WorkQueue.builder().worker(worker); + workQueue = new MemoryWorkQueue(worker); } public TaskId submit(Task task) { diff --git a/server/task/src/main/java/org/apache/james/task/WorkQueue.java b/server/task/src/main/java/org/apache/james/task/MemoryWorkQueue.java similarity index 90% copy from server/task/src/main/java/org/apache/james/task/WorkQueue.java copy to server/task/src/main/java/org/apache/james/task/MemoryWorkQueue.java index e2e7035..8a720e6 100644 --- a/server/task/src/main/java/org/apache/james/task/WorkQueue.java +++ b/server/task/src/main/java/org/apache/james/task/MemoryWorkQueue.java @@ -19,7 +19,6 @@ package org.apache.james.task; -import java.io.Closeable; import java.io.IOException; import java.util.concurrent.LinkedBlockingQueue; @@ -29,21 +28,13 @@ import reactor.core.scheduler.Schedulers; import reactor.util.function.Tuple2; import reactor.util.function.Tuples; -public class WorkQueue implements Closeable { - - public static RequireWorker builder() { - return WorkQueue::new; - } - - public interface RequireWorker { - WorkQueue worker(TaskManagerWorker worker); - } +public class MemoryWorkQueue implements WorkQueue { private final TaskManagerWorker worker; private final Disposable subscription; private final LinkedBlockingQueue<Tuple2<TaskWithId, TaskManagerWorker.Listener>> tasks; - private WorkQueue(TaskManagerWorker worker) { + public MemoryWorkQueue(TaskManagerWorker worker) { this.worker = worker; this.tasks = new LinkedBlockingQueue<>(); this.subscription = Mono.fromCallable(tasks::take) diff --git a/server/task/src/main/java/org/apache/james/task/WorkQueue.java b/server/task/src/main/java/org/apache/james/task/WorkQueue.java index e2e7035..7c8c70a 100644 --- a/server/task/src/main/java/org/apache/james/task/WorkQueue.java +++ b/server/task/src/main/java/org/apache/james/task/WorkQueue.java @@ -16,69 +16,13 @@ * specific language governing permissions and limitations * * under the License. * ****************************************************************/ - package org.apache.james.task; import java.io.Closeable; -import java.io.IOException; -import java.util.concurrent.LinkedBlockingQueue; - -import reactor.core.Disposable; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; -import reactor.util.function.Tuple2; -import reactor.util.function.Tuples; - -public class WorkQueue implements Closeable { - - public static RequireWorker builder() { - return WorkQueue::new; - } - - public interface RequireWorker { - WorkQueue worker(TaskManagerWorker worker); - } - - private final TaskManagerWorker worker; - private final Disposable subscription; - private final LinkedBlockingQueue<Tuple2<TaskWithId, TaskManagerWorker.Listener>> tasks; - - private WorkQueue(TaskManagerWorker worker) { - this.worker = worker; - this.tasks = new LinkedBlockingQueue<>(); - this.subscription = Mono.fromCallable(tasks::take) - .repeat() - .subscribeOn(Schedulers.elastic()) - .flatMapSequential(this::dispatchTaskToWorker) - .subscribe(); - } - - private Mono<?> dispatchTaskToWorker(Tuple2<TaskWithId, TaskManagerWorker.Listener> tuple) { - TaskWithId taskWithId = tuple.getT1(); - TaskManagerWorker.Listener listener = tuple.getT2(); - return worker.executeTask(taskWithId, listener); - } - - public void submit(TaskWithId taskWithId, TaskManagerWorker.Listener listener) { - try { - tasks.put(Tuples.of(taskWithId, listener)); - } catch (InterruptedException e) { - listener.cancelled(); - } - } - public void cancel(TaskId taskId) { - worker.cancelTask(taskId); - } +public interface WorkQueue extends Closeable { - @Override - public void close() throws IOException { - try { - subscription.dispose(); - } catch (Throwable ignore) { - //avoid failing during close - } - worker.close(); - } + void submit(TaskWithId taskWithId, TaskManagerWorker.Listener listener); + void cancel(TaskId taskId); } diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala index 17c9ebd..0569dcc 100644 --- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala +++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala @@ -30,10 +30,12 @@ import org.apache.james.task.eventsourcing.TaskCommand._ import scala.annotation.tailrec -class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing](val eventStore: EventStore, +class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing]( + val serialTaskManagerWorker: TaskManagerWorker, + val workQueue: WorkQueue, + val eventStore: EventStore, val executionDetailsProjection: TaskExecutionDetailsProjection) extends TaskManager with Closeable { - private val workQueue: WorkQueue = WorkQueue.builder().worker(new SerialTaskManagerWorker) private val delayBetweenPollingInMs = 500 private def workDispatcher: Subscriber = { diff --git a/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java b/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java index b017721..057462d 100644 --- a/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java +++ b/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java @@ -22,8 +22,12 @@ package org.apache.james.task.eventsourcing; import org.apache.james.eventsourcing.eventstore.EventStore; import org.apache.james.eventsourcing.eventstore.memory.InMemoryEventStore; import org.apache.james.task.CountDownLatchExtension; +import org.apache.james.task.MemoryWorkQueue; +import org.apache.james.task.SerialTaskManagerWorker; import org.apache.james.task.TaskManager; import org.apache.james.task.TaskManagerContract; +import org.apache.james.task.TaskManagerWorker; +import org.apache.james.task.WorkQueue; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.ExtendWith; @@ -35,9 +39,11 @@ class EventSourcingTaskManagerTest implements TaskManagerContract { @BeforeEach void setUp() { + TaskManagerWorker worker = new SerialTaskManagerWorker(); + WorkQueue workQueue = new MemoryWorkQueue(worker); EventStore eventStore = new InMemoryEventStore(); TaskExecutionDetailsProjection executionDetailsProjection = new MemoryTaskExecutionDetailsProjection(); - taskManager = new EventSourcingTaskManager(eventStore, executionDetailsProjection); + taskManager = new EventSourcingTaskManager(worker, workQueue, eventStore, executionDetailsProjection); } @AfterEach --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
