This is an automated email from the ASF dual-hosted git repository. rouazana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 4289da532648f9145f585273607d58f09a4a2bbf Author: Rémi Kowalski <[email protected]> AuthorDate: Wed Jul 31 18:00:48 2019 +0200 JAMES-2813 remove listener from submit task to the worker --- .../org/apache/james/CassandraTaskManagerTest.java | 28 ++++++++----- .../apache/james/DistributedTaskManagerModule.java | 14 +++++++ .../org/apache/james/task/MemoryTaskManager.java | 43 +++++++++++--------- .../org/apache/james/task/MemoryWorkQueue.java | 16 +++----- .../apache/james/task/SerialTaskManagerWorker.java | 24 ++++++----- .../org/apache/james/task/TaskManagerWorker.java | 12 +++--- .../main/java/org/apache/james/task/WorkQueue.java | 2 +- .../eventsourcing/EventSourcingTaskManager.scala | 47 +++++++++++----------- .../task/eventsourcing/WorkQueueSupplier.scala} | 23 +++++------ .../task/eventsourcing/WorkerStatusListener.scala | 46 ++++++++++----------- .../james/task/SerialTaskManagerWorkerTest.java | 42 +++++++++---------- .../EventSourcingTaskManagerTest.java | 10 +++-- 12 files changed, 169 insertions(+), 138 deletions(-) diff --git a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/CassandraTaskManagerTest.java b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/CassandraTaskManagerTest.java index 82c3f2c..72cd9ff 100644 --- a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/CassandraTaskManagerTest.java +++ b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/CassandraTaskManagerTest.java @@ -19,6 +19,11 @@ package org.apache.james; +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Set; +import java.util.concurrent.TimeUnit; + import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.CassandraClusterExtension; import org.apache.james.backends.cassandra.components.CassandraModule; @@ -31,27 +36,26 @@ import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule; import org.apache.james.server.task.json.JsonTaskSerializer; import org.apache.james.server.task.json.dto.TestTaskDTOModules; import org.apache.james.task.CompletedTask; +import org.apache.james.task.MemoryWorkQueue; +import org.apache.james.task.SerialTaskManagerWorker; import org.apache.james.task.TaskExecutionDetails; import org.apache.james.task.TaskId; import org.apache.james.task.TaskManager; +import org.apache.james.task.TaskManagerWorker; import org.apache.james.task.eventsourcing.EventSourcingTaskManager; import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection; +import org.apache.james.task.eventsourcing.WorkQueueSupplier; +import org.apache.james.task.eventsourcing.WorkerStatusListener; import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjection; import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjectionDAO; import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjectionModule; import org.apache.james.task.eventsourcing.cassandra.TasksSerializationModule; - -import com.github.steveash.guavate.Guavate; import org.awaitility.Awaitility; import org.awaitility.Duration; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import static org.assertj.core.api.Assertions.assertThat; +import com.github.steveash.guavate.Guavate; class CassandraTaskManagerTest { private static final JsonTaskSerializer TASK_SERIALIZER = new JsonTaskSerializer(TestTaskDTOModules.COMPLETED_TASK_MODULE); @@ -73,8 +77,14 @@ class CassandraTaskManagerTest { CassandraCluster cassandra = cassandraCluster.getCassandraCluster(); CassandraTaskExecutionDetailsProjectionDAO cassandraTaskExecutionDetailsProjectionDAO = new CassandraTaskExecutionDetailsProjectionDAO(cassandra.getConf(), cassandra.getTypesProvider()); TaskExecutionDetailsProjection executionDetailsProjection = new CassandraTaskExecutionDetailsProjection(cassandraTaskExecutionDetailsProjectionDAO); - TaskManager taskManager1 = new EventSourcingTaskManager(eventStore, executionDetailsProjection); - TaskManager taskManager2 = new EventSourcingTaskManager(eventStore, executionDetailsProjection); + + WorkQueueSupplier workQueueSupplier = eventSourcingSystem -> { + WorkerStatusListener listener = new WorkerStatusListener(eventSourcingSystem); + TaskManagerWorker worker = new SerialTaskManagerWorker(listener); + return new MemoryWorkQueue(worker); + }; + TaskManager taskManager1 = new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection); + TaskManager taskManager2 = new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection); TaskId taskId = taskManager1.submit(new CompletedTask()); Awaitility.await() diff --git a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/DistributedTaskManagerModule.java b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/DistributedTaskManagerModule.java index 7a6391f..6f1a294 100644 --- a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/DistributedTaskManagerModule.java +++ b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/DistributedTaskManagerModule.java @@ -20,20 +20,34 @@ package org.apache.james; +import org.apache.james.task.MemoryWorkQueue; +import org.apache.james.task.SerialTaskManagerWorker; import org.apache.james.task.TaskManager; +import org.apache.james.task.TaskManagerWorker; import org.apache.james.task.eventsourcing.EventSourcingTaskManager; import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection; +import org.apache.james.task.eventsourcing.WorkQueueSupplier; +import org.apache.james.task.eventsourcing.WorkerStatusListener; import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjection; import com.google.inject.AbstractModule; import com.google.inject.Scopes; public class DistributedTaskManagerModule extends AbstractModule { + + public static final WorkQueueSupplier workQueueSupplier = eventSourcingSystem -> { + WorkerStatusListener listener = new WorkerStatusListener(eventSourcingSystem); + TaskManagerWorker worker = new SerialTaskManagerWorker(listener); + return new MemoryWorkQueue(worker); + }; + @Override protected void configure() { bind(TaskExecutionDetailsProjection.class).in(Scopes.SINGLETON); bind(TaskManager.class).in(Scopes.SINGLETON); + bind(WorkQueueSupplier.class).in(Scopes.SINGLETON); bind(TaskExecutionDetailsProjection.class).to(CassandraTaskExecutionDetailsProjection.class); bind(TaskManager.class).to(EventSourcingTaskManager.class); + bind(WorkQueueSupplier.class).toInstance(workQueueSupplier); } } diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java index ad7bd9d..f972e39 100644 --- a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java +++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java @@ -36,38 +36,45 @@ import reactor.core.scheduler.Schedulers; public class MemoryTaskManager implements TaskManager { + @FunctionalInterface + private interface TaskExecutionDetailsUpdaterFactory { + Consumer<TaskExecutionDetailsUpdater> apply(TaskId taskId); + } + private static class DetailsUpdater implements TaskManagerWorker.Listener { - private final Consumer<TaskExecutionDetailsUpdater> updater; + private final TaskExecutionDetailsUpdaterFactory updaterFactory; - DetailsUpdater(Consumer<TaskExecutionDetailsUpdater> updater) { - this.updater = updater; + DetailsUpdater(TaskExecutionDetailsUpdaterFactory updaterFactory) { + this.updaterFactory = updaterFactory; } @Override - public void started() { - updater.accept(TaskExecutionDetails::started); + public void started(TaskId taskId) { + updaterFactory.apply(taskId).accept(TaskExecutionDetails::started); } @Override - public void completed(Task.Result result) { - updater.accept(TaskExecutionDetails::completed); - + public void completed(TaskId taskId, Task.Result result) { + updaterFactory.apply(taskId) + .accept(TaskExecutionDetails::completed); } @Override - public void failed(Throwable t) { - failed(); + public void failed(TaskId taskId, Throwable t) { + failed(taskId); } @Override - public void failed() { - updater.accept(TaskExecutionDetails::failed); + public void failed(TaskId taskId) { + updaterFactory.apply(taskId) + .accept(TaskExecutionDetails::failed); } @Override - public void cancelled() { - updater.accept(TaskExecutionDetails::cancelEffectively); + public void cancelled(TaskId taskId) { + updaterFactory.apply(taskId) + .accept(TaskExecutionDetails::cancelEffectively); } } @@ -80,7 +87,7 @@ public class MemoryTaskManager implements TaskManager { public MemoryTaskManager() { idToExecutionDetails = new ConcurrentHashMap<>(); - worker = new SerialTaskManagerWorker(); + worker = new SerialTaskManagerWorker(updater()); workQueue = new MemoryWorkQueue(worker); } @@ -88,7 +95,7 @@ public class MemoryTaskManager implements TaskManager { TaskId taskId = TaskId.generateTaskId(); TaskExecutionDetails executionDetails = TaskExecutionDetails.from(task, taskId); idToExecutionDetails.put(taskId, executionDetails); - workQueue.submit(new TaskWithId(taskId, task), updater(taskId)); + workQueue.submit(new TaskWithId(taskId, task)); return taskId; } @@ -148,8 +155,8 @@ public class MemoryTaskManager implements TaskManager { } } - private DetailsUpdater updater(TaskId id) { - return new DetailsUpdater(updateDetails(id)); + private DetailsUpdater updater() { + return new DetailsUpdater(this::updateDetails); } private Consumer<TaskExecutionDetailsUpdater> updateDetails(TaskId taskId) { diff --git a/server/task/src/main/java/org/apache/james/task/MemoryWorkQueue.java b/server/task/src/main/java/org/apache/james/task/MemoryWorkQueue.java index 8a720e6..cea8b04 100644 --- a/server/task/src/main/java/org/apache/james/task/MemoryWorkQueue.java +++ b/server/task/src/main/java/org/apache/james/task/MemoryWorkQueue.java @@ -25,14 +25,12 @@ import java.util.concurrent.LinkedBlockingQueue; import reactor.core.Disposable; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; -import reactor.util.function.Tuple2; -import reactor.util.function.Tuples; public class MemoryWorkQueue implements WorkQueue { private final TaskManagerWorker worker; private final Disposable subscription; - private final LinkedBlockingQueue<Tuple2<TaskWithId, TaskManagerWorker.Listener>> tasks; + private final LinkedBlockingQueue<TaskWithId> tasks; public MemoryWorkQueue(TaskManagerWorker worker) { this.worker = worker; @@ -44,17 +42,15 @@ public class MemoryWorkQueue implements WorkQueue { .subscribe(); } - private Mono<?> dispatchTaskToWorker(Tuple2<TaskWithId, TaskManagerWorker.Listener> tuple) { - TaskWithId taskWithId = tuple.getT1(); - TaskManagerWorker.Listener listener = tuple.getT2(); - return worker.executeTask(taskWithId, listener); + private Mono<?> dispatchTaskToWorker(TaskWithId taskWithId) { + return worker.executeTask(taskWithId); } - public void submit(TaskWithId taskWithId, TaskManagerWorker.Listener listener) { + public void submit(TaskWithId taskWithId) { try { - tasks.put(Tuples.of(taskWithId, listener)); + tasks.put(taskWithId); } catch (InterruptedException e) { - listener.cancelled(); + worker.cancelTask(taskWithId.getId()); } } diff --git a/server/task/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java b/server/task/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java index 4736372..afb7a39 100644 --- a/server/task/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java +++ b/server/task/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java @@ -45,19 +45,21 @@ public class SerialTaskManagerWorker implements TaskManagerWorker { private static final Logger LOGGER = LoggerFactory.getLogger(SerialTaskManagerWorker.class); private final ExecutorService taskExecutor; + private final Listener listener; private final AtomicReference<Tuple2<TaskId, Future<?>>> runningTask; private final Semaphore semaphore; private final Set<TaskId> cancelledTasks; - public SerialTaskManagerWorker() { + public SerialTaskManagerWorker(Listener listener) { this.taskExecutor = Executors.newSingleThreadExecutor(NamedThreadFactory.withName("task executor")); + this.listener = listener; this.cancelledTasks = Sets.newConcurrentHashSet(); this.runningTask = new AtomicReference<>(); this.semaphore = new Semaphore(1); } @Override - public Mono<Task.Result> executeTask(TaskWithId taskWithId, Listener listener) { + public Mono<Task.Result> executeTask(TaskWithId taskWithId) { return Mono .using( acquireSemaphore(taskWithId, listener), @@ -72,7 +74,7 @@ public class SerialTaskManagerWorker implements TaskManagerWorker { semaphore.acquire(); return semaphore; } catch (InterruptedException e) { - listener.cancelled(); + listener.cancelled(taskWithId.getId()); throw e; } }; @@ -87,14 +89,14 @@ public class SerialTaskManagerWorker implements TaskManagerWorker { return Mono.fromFuture(future) .doOnError(exception -> { if (exception instanceof CancellationException) { - listener.cancelled(); + listener.cancelled(taskWithId.getId()); } else { - listener.failed(exception); + listener.failed(taskWithId.getId(), exception); } }) .onErrorReturn(Task.Result.PARTIAL); } else { - listener.cancelled(); + listener.cancelled(taskWithId.getId()); return Mono.empty(); } }; @@ -110,21 +112,21 @@ public class SerialTaskManagerWorker implements TaskManagerWorker { } private Task.Result run(TaskWithId taskWithId, Listener listener) { - listener.started(); + listener.started(taskWithId.getId()); try { return taskWithId.getTask() .run() - .onComplete(listener::completed) + .onComplete(result -> listener.completed(taskWithId.getId(), result)) .onFailure(() -> { LOGGER.error("Task was partially performed. Check logs for more details. Taskid : " + taskWithId.getId()); - listener.failed(); + listener.failed(taskWithId.getId()); }); } catch (InterruptedException e) { - listener.cancelled(); + listener.cancelled(taskWithId.getId()); return Task.Result.PARTIAL; } catch (Exception e) { LOGGER.error("Error while running task {}", taskWithId.getId(), e); - listener.failed(e); + listener.failed(taskWithId.getId(), e); return Task.Result.PARTIAL; } } diff --git a/server/task/src/main/java/org/apache/james/task/TaskManagerWorker.java b/server/task/src/main/java/org/apache/james/task/TaskManagerWorker.java index cacc6c8..4780cc7 100644 --- a/server/task/src/main/java/org/apache/james/task/TaskManagerWorker.java +++ b/server/task/src/main/java/org/apache/james/task/TaskManagerWorker.java @@ -25,18 +25,18 @@ import reactor.core.publisher.Mono; public interface TaskManagerWorker extends Closeable { interface Listener { - void started(); + void started(TaskId taskId); - void completed(Task.Result result); + void completed(TaskId taskId, Task.Result result); - void failed(Throwable t); + void failed(TaskId taskId, Throwable t); - void failed(); + void failed(TaskId taskId); - void cancelled(); + void cancelled(TaskId taskId); } - Mono<Task.Result> executeTask(TaskWithId taskWithId, Listener listener); + Mono<Task.Result> executeTask(TaskWithId taskWithId); void cancelTask(TaskId taskId); } diff --git a/server/task/src/main/java/org/apache/james/task/WorkQueue.java b/server/task/src/main/java/org/apache/james/task/WorkQueue.java index 7c8c70a..ae363ff 100644 --- a/server/task/src/main/java/org/apache/james/task/WorkQueue.java +++ b/server/task/src/main/java/org/apache/james/task/WorkQueue.java @@ -22,7 +22,7 @@ import java.io.Closeable; public interface WorkQueue extends Closeable { - void submit(TaskWithId taskWithId, TaskManagerWorker.Listener listener); + void submit(TaskWithId taskWithId); void cancel(TaskId taskId); } diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala index 0569dcc..34f55cf 100644 --- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala +++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala @@ -1,21 +1,21 @@ /** ************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - * ***************************************************************/ + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + * ***************************************************************/ package org.apache.james.task.eventsourcing import java.io.Closeable @@ -23,25 +23,24 @@ import java.util import com.google.common.annotations.VisibleForTesting import javax.inject.Inject -import org.apache.james.eventsourcing.{AggregateId, Subscriber} import org.apache.james.eventsourcing.eventstore.{EventStore, History} +import org.apache.james.eventsourcing.{AggregateId, Subscriber} import org.apache.james.task._ import org.apache.james.task.eventsourcing.TaskCommand._ import scala.annotation.tailrec class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing]( - val serialTaskManagerWorker: TaskManagerWorker, - val workQueue: WorkQueue, - val eventStore: EventStore, - val executionDetailsProjection: TaskExecutionDetailsProjection) extends TaskManager with Closeable { + workQueueSupplier: WorkQueueSupplier, + val eventStore: EventStore, + val executionDetailsProjection: TaskExecutionDetailsProjection) extends TaskManager with Closeable { private val delayBetweenPollingInMs = 500 private def workDispatcher: Subscriber = { case Created(aggregateId, _, task) => val taskWithId = new TaskWithId(aggregateId.taskId, task) - workQueue.submit(taskWithId, new WorkerStatusListener(taskWithId.getId, eventSourcingSystem)) + workQueue.submit(taskWithId) case CancelRequested(aggregateId, _) => workQueue.cancel(aggregateId.taskId) case _ => @@ -63,6 +62,8 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing] workDispatcher), eventStore = eventStore) + private val workQueue: WorkQueue = workQueueSupplier(eventSourcingSystem) + override def submit(task: Task): TaskId = { val taskId = TaskId.generateTaskId val command = Create(taskId, task) diff --git a/server/task/src/main/java/org/apache/james/task/WorkQueue.java b/server/task/src/main/scala/org/apache/james/task/eventsourcing/WorkQueueSupplier.scala similarity index 61% copy from server/task/src/main/java/org/apache/james/task/WorkQueue.java copy to server/task/src/main/scala/org/apache/james/task/eventsourcing/WorkQueueSupplier.scala index 7c8c70a..0265359 100644 --- a/server/task/src/main/java/org/apache/james/task/WorkQueue.java +++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/WorkQueueSupplier.scala @@ -1,4 +1,4 @@ -/**************************************************************** +/** ************************************************************** * Licensed to the Apache Software Foundation (ASF) under one * * or more contributor license agreements. See the NOTICE file * * distributed with this work for additional information * @@ -6,23 +6,22 @@ * to you under the Apache License, Version 2.0 (the * * "License"); you may not use this file except in compliance * * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * * Unless required by applicable law or agreed to in writing, * * software distributed under the License is distributed on an * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * * KIND, either express or implied. See the License for the * * specific language governing permissions and limitations * * under the License. * - ****************************************************************/ -package org.apache.james.task; + * ***************************************************************/ +package org.apache.james.task.eventsourcing -import java.io.Closeable; +import org.apache.james.eventsourcing.EventSourcingSystem +import org.apache.james.task.WorkQueue -public interface WorkQueue extends Closeable { - - void submit(TaskWithId taskWithId, TaskManagerWorker.Listener listener); - - void cancel(TaskId taskId); +@FunctionalInterface +trait WorkQueueSupplier { + def apply(eventSourcingSystem: EventSourcingSystem): WorkQueue } diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala index 7e1ca87..2b08857 100644 --- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala +++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala @@ -1,21 +1,21 @@ /** ************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - * ***************************************************************/ + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + * ***************************************************************/ package org.apache.james.task.eventsourcing @@ -24,15 +24,15 @@ import org.apache.james.task.Task.Result import org.apache.james.task.eventsourcing.TaskCommand._ import org.apache.james.task.{TaskId, TaskManagerWorker} -class WorkerStatusListener(taskId: TaskId, eventSourcingSystem: EventSourcingSystem) extends TaskManagerWorker.Listener { +case class WorkerStatusListener(eventSourcingSystem: EventSourcingSystem) extends TaskManagerWorker.Listener { - override def started(): Unit = eventSourcingSystem.dispatch(Start(taskId)) + override def started(taskId: TaskId): Unit = eventSourcingSystem.dispatch(Start(taskId)) - override def completed(result: Result): Unit = eventSourcingSystem.dispatch(Complete(taskId, result)) + override def completed(taskId: TaskId, result: Result): Unit = eventSourcingSystem.dispatch(Complete(taskId, result)) - override def failed(t: Throwable): Unit = eventSourcingSystem.dispatch(Fail(taskId)) + override def failed(taskId: TaskId, t: Throwable): Unit = eventSourcingSystem.dispatch(Fail(taskId)) - override def failed(): Unit = eventSourcingSystem.dispatch(Fail(taskId)) + override def failed(taskId: TaskId): Unit = eventSourcingSystem.dispatch(Fail(taskId)) - override def cancelled(): Unit = eventSourcingSystem.dispatch(Cancel(taskId)) + override def cancelled(taskId: TaskId): Unit = eventSourcingSystem.dispatch(Cancel(taskId)) } \ No newline at end of file diff --git a/server/task/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java b/server/task/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java index 2aa1dc0..522ed0c 100644 --- a/server/task/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java +++ b/server/task/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java @@ -20,6 +20,7 @@ package org.apache.james.task; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -32,18 +33,25 @@ import java.util.concurrent.atomic.AtomicInteger; import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import reactor.core.publisher.Mono; class SerialTaskManagerWorkerTest { - - private final SerialTaskManagerWorker worker = new SerialTaskManagerWorker(); + private TaskManagerWorker.Listener listener; + private SerialTaskManagerWorker worker; private final Task successfulTask = new CompletedTask(); private final Task failedTask = new FailedTask(); private final Task throwingTask = new ThrowingTask(); + @BeforeEach + void beforeEach() { + listener = mock(TaskManagerWorker.Listener.class); + worker = new SerialTaskManagerWorker(listener); + } + @AfterEach void tearDown() throws IOException { worker.close(); @@ -53,35 +61,31 @@ class SerialTaskManagerWorkerTest { void aSuccessfullTaskShouldCompleteSuccessfully() { TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), this.successfulTask); - TaskManagerWorker.Listener listener = mock(TaskManagerWorker.Listener.class); - - Mono<Task.Result> result = worker.executeTask(taskWithId, listener); + Mono<Task.Result> result = worker.executeTask(taskWithId); assertThat(result.block()).isEqualTo(Task.Result.COMPLETED); - verify(listener, atLeastOnce()).completed(Task.Result.COMPLETED); + verify(listener, atLeastOnce()).completed(taskWithId.getId(), Task.Result.COMPLETED); } @Test void aFailedTaskShouldCompleteWithFailedStatus() { TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), failedTask); - TaskManagerWorker.Listener listener = mock(TaskManagerWorker.Listener.class); - Mono<Task.Result> result = worker.executeTask(taskWithId, listener); + Mono<Task.Result> result = worker.executeTask(taskWithId); assertThat(result.block()).isEqualTo(Task.Result.PARTIAL); - verify(listener, atLeastOnce()).failed(); + verify(listener, atLeastOnce()).failed(taskWithId.getId()); } @Test void aThrowingTaskShouldCompleteWithFailedStatus() { TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), throwingTask); - TaskManagerWorker.Listener listener = mock(TaskManagerWorker.Listener.class); - Mono<Task.Result> result = worker.executeTask(taskWithId, listener); + Mono<Task.Result> result = worker.executeTask(taskWithId); assertThat(result.block()).isEqualTo(Task.Result.PARTIAL); - verify(listener, atLeastOnce()).failed(any(RuntimeException.class)); + verify(listener, atLeastOnce()).failed(eq(taskWithId.getId()), any(RuntimeException.class)); } @Test @@ -98,12 +102,10 @@ class SerialTaskManagerWorkerTest { TaskWithId taskWithId = new TaskWithId(id, inProgressTask); - TaskManagerWorker.Listener listener = mock(TaskManagerWorker.Listener.class); - - worker.executeTask(taskWithId, listener).subscribe(); + worker.executeTask(taskWithId).subscribe(); await(taskLaunched); - verify(listener, atLeastOnce()).started(); + verify(listener, atLeastOnce()).started(id); verifyNoMoreInteractions(listener); latch.countDown(); } @@ -122,19 +124,17 @@ class SerialTaskManagerWorkerTest { TaskWithId taskWithId = new TaskWithId(id, inProgressTask); - TaskManagerWorker.Listener listener = mock(TaskManagerWorker.Listener.class); - - Mono<Task.Result> resultMono = worker.executeTask(taskWithId, listener).cache(); + Mono<Task.Result> resultMono = worker.executeTask(taskWithId).cache(); resultMono.subscribe(); Awaitility.waitAtMost(org.awaitility.Duration.TEN_SECONDS) - .untilAsserted(() -> verify(listener, atLeastOnce()).started()); + .untilAsserted(() -> verify(listener, atLeastOnce()).started(id)); worker.cancelTask(id); resultMono.block(Duration.ofSeconds(10)); - verify(listener, atLeastOnce()).cancelled(); + verify(listener, atLeastOnce()).cancelled(id); verifyNoMoreInteractions(listener); } diff --git a/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java b/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java index 057462d..63c62b0 100644 --- a/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java +++ b/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java @@ -27,7 +27,6 @@ import org.apache.james.task.SerialTaskManagerWorker; import org.apache.james.task.TaskManager; import org.apache.james.task.TaskManagerContract; import org.apache.james.task.TaskManagerWorker; -import org.apache.james.task.WorkQueue; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.ExtendWith; @@ -39,11 +38,14 @@ class EventSourcingTaskManagerTest implements TaskManagerContract { @BeforeEach void setUp() { - TaskManagerWorker worker = new SerialTaskManagerWorker(); - WorkQueue workQueue = new MemoryWorkQueue(worker); EventStore eventStore = new InMemoryEventStore(); TaskExecutionDetailsProjection executionDetailsProjection = new MemoryTaskExecutionDetailsProjection(); - taskManager = new EventSourcingTaskManager(worker, workQueue, eventStore, executionDetailsProjection); + WorkQueueSupplier workQueueSupplier = eventSourcingSystem -> { + WorkerStatusListener listener = new WorkerStatusListener(eventSourcingSystem); + TaskManagerWorker worker = new SerialTaskManagerWorker(listener); + return new MemoryWorkQueue(worker); + }; + taskManager = new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection); } @AfterEach --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
