This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 0e376b9ca76f53548447fd4fa246bb20eb3fcb30 Author: Benoit Tellier <btell...@linagora.com> AuthorDate: Fri Jan 27 14:06:56 2023 +0700 JAMES-3694 Apply queue expiracy for the task manager Safer than auto-deletes --- .../org/apache/james/task/TaskManagerContract.java | 52 +++++++++++----------- .../distributed/RabbitMQTerminationSubscriber.java | 10 ++++- .../distributed/RabbitMQWorkQueue.java | 17 +++++-- .../RabbitMQWorkQueueReconnectionHandler.java | 11 +++-- .../TerminationReconnectionHandler.java | 11 +++-- .../distributed/RabbitMQWorkQueueSupplier.scala | 7 +-- .../distributed/DistributedTaskManagerTest.java | 43 +++++++++--------- .../RabbitMQTerminationSubscriberTest.java | 10 ++--- .../RabbitMQWorkQueuePersistenceTest.java | 8 ++-- .../distributed/RabbitMQWorkQueueTest.java | 12 ++--- .../TerminationSubscriberContract.java | 16 +++---- 11 files changed, 113 insertions(+), 84 deletions(-) diff --git a/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java b/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java index 3e5b4a18ad..2a092c735a 100644 --- a/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java +++ b/server/task/task-api/src/test/java/org/apache/james/task/TaskManagerContract.java @@ -47,10 +47,10 @@ public interface TaskManagerContract { ConditionFactory awaitAtMostTwoSeconds = calmlyAwait.atMost(Duration.ofSeconds(2)); java.time.Duration TIMEOUT = java.time.Duration.ofMinutes(15); - TaskManager taskManager(); + TaskManager taskManager() throws Exception; @Test - default void submitShouldReturnATaskId() { + default void submitShouldReturnATaskId() throws Exception { TaskId taskId = taskManager().submit(new CompletedTask()); assertThat(taskId).isNotNull(); } @@ -63,7 +63,7 @@ public interface TaskManagerContract { } @Test - default void getStatusShouldReturnWaitingWhenNotYetProcessed(CountDownLatch waitingForResultLatch) { + default void getStatusShouldReturnWaitingWhenNotYetProcessed(CountDownLatch waitingForResultLatch) throws Exception { TaskManager taskManager = taskManager(); taskManager.submit(new MemoryReferenceTask(() -> { waitingForResultLatch.await(); @@ -77,7 +77,7 @@ public interface TaskManagerContract { } @Test - default void taskCodeAfterCancelIsNotRun(CountDownLatch waitingForResultLatch) throws InterruptedException { + default void taskCodeAfterCancelIsNotRun(CountDownLatch waitingForResultLatch) throws Exception { TaskManager taskManager = taskManager(); CountDownLatch waitForTaskToBeLaunched = new CountDownLatch(1); AtomicInteger count = new AtomicInteger(0); @@ -98,7 +98,7 @@ public interface TaskManagerContract { } @Test - default void completedTaskShouldNotBeCancelled() { + default void completedTaskShouldNotBeCancelled() throws Exception { TaskManager taskManager = taskManager(); TaskId id = taskManager.submit(new CompletedTask()); @@ -114,7 +114,7 @@ public interface TaskManagerContract { } @Test - default void failedTaskShouldNotBeCancelled() { + default void failedTaskShouldNotBeCancelled() throws Exception { TaskManager taskManager = taskManager(); TaskId id = taskManager.submit(new FailedTask()); @@ -130,7 +130,7 @@ public interface TaskManagerContract { } @Test - default void getStatusShouldBeCancelledWhenCancelled(CountDownLatch countDownLatch) { + default void getStatusShouldBeCancelledWhenCancelled(CountDownLatch countDownLatch) throws Exception { TaskManager taskManager = taskManager(); TaskId id = taskManager.submit(new MemoryReferenceTask(() -> { countDownLatch.await(); @@ -152,7 +152,7 @@ public interface TaskManagerContract { } @Test - default void aWaitingTaskShouldBeCancelled(CountDownLatch countDownLatch) { + default void aWaitingTaskShouldBeCancelled(CountDownLatch countDownLatch) throws Exception { TaskManager taskManager = taskManager(); TaskId id = taskManager.submit(new MemoryReferenceTask(() -> { countDownLatch.await(); @@ -176,7 +176,7 @@ public interface TaskManagerContract { } @Test - default void cancelShouldBeIdempotent(CountDownLatch waitingForResultLatch) { + default void cancelShouldBeIdempotent(CountDownLatch waitingForResultLatch) throws Exception { TaskManager taskManager = taskManager(); TaskId id = taskManager.submit(new MemoryReferenceTask(() -> { waitingForResultLatch.await(); @@ -189,7 +189,7 @@ public interface TaskManagerContract { } @Test - default void getStatusShouldReturnInProgressWhenProcessingIsInProgress(CountDownLatch waitingForResultLatch) { + default void getStatusShouldReturnInProgressWhenProcessingIsInProgress(CountDownLatch waitingForResultLatch) throws Exception { TaskManager taskManager = taskManager(); TaskId taskId = taskManager.submit(new MemoryReferenceTask(() -> { waitingForResultLatch.await(); @@ -201,7 +201,7 @@ public interface TaskManagerContract { } @Test - default void getStatusShouldReturnCompletedWhenRunSuccessfully() { + default void getStatusShouldReturnCompletedWhenRunSuccessfully() throws Exception { TaskManager taskManager = taskManager(); TaskId taskId = taskManager.submit( new CompletedTask()); @@ -212,7 +212,7 @@ public interface TaskManagerContract { } @Test - default void additionalInformationShouldBeUpdatedWhenRunSuccessfully() { + default void additionalInformationShouldBeUpdatedWhenRunSuccessfully() throws Exception { TaskManager taskManager = taskManager(); TaskId taskId = taskManager.submit(new MemoryReferenceWithCounterTask(counter -> { counter.incrementAndGet(); @@ -230,7 +230,7 @@ public interface TaskManagerContract { } @Test - default void additionalInformationShouldBeUpdatedWhenFailed() { + default void additionalInformationShouldBeUpdatedWhenFailed() throws Exception { TaskManager taskManager = taskManager(); TaskId taskId = taskManager.submit(new MemoryReferenceWithCounterTask(counter -> { counter.incrementAndGet(); @@ -248,7 +248,7 @@ public interface TaskManagerContract { } @Test - default void additionalInformationShouldBeUpdatedWhenCancelled(CountDownLatch countDownLatch) { + default void additionalInformationShouldBeUpdatedWhenCancelled(CountDownLatch countDownLatch) throws Exception { TaskManager taskManager = taskManager(); TaskId id = taskManager.submit(new MemoryReferenceWithCounterTask((counter) -> { counter.incrementAndGet(); @@ -273,7 +273,7 @@ public interface TaskManagerContract { } @Test - default void additionalInformationShouldBeUpdatedDuringExecution(CountDownLatch countDownLatch) { + default void additionalInformationShouldBeUpdatedDuringExecution(CountDownLatch countDownLatch) throws Exception { TaskManager taskManager = taskManager(); TaskId id = taskManager.submit(new MemoryReferenceWithCounterTask((counter) -> { counter.incrementAndGet(); @@ -288,7 +288,7 @@ public interface TaskManagerContract { } @Test - default void additionalInformationShouldBeAvailableOnAnyTaskManagerDuringExecution(CountDownLatch countDownLatch) { + default void additionalInformationShouldBeAvailableOnAnyTaskManagerDuringExecution(CountDownLatch countDownLatch) throws Exception { TaskManager taskManager = taskManager(); TaskManager otherTaskManager = taskManager(); TaskId id = taskManager.submit(new MemoryReferenceWithCounterTask((counter) -> { @@ -312,7 +312,7 @@ public interface TaskManagerContract { } @Test - default void getStatusShouldReturnFailedWhenRunPartially() { + default void getStatusShouldReturnFailedWhenRunPartially() throws Exception { TaskManager taskManager = taskManager(); TaskId taskId = taskManager.submit( new FailedTask()); @@ -493,22 +493,22 @@ public interface TaskManagerContract { } @Test - default void listShouldBeEmptyWhenNoTasks() { + default void listShouldBeEmptyWhenNoTasks() throws Exception { assertThat(taskManager().list()).isEmpty(); } @Test - default void listCancelledShouldBeEmptyWhenNoTasks() { + default void listCancelledShouldBeEmptyWhenNoTasks() throws Exception { assertThat(taskManager().list(TaskManager.Status.CANCELLED)).isEmpty(); } @Test - default void listCancelRequestedShouldBeEmptyWhenNoTasks() { + default void listCancelRequestedShouldBeEmptyWhenNoTasks() throws Exception { assertThat(taskManager().list(TaskManager.Status.CANCEL_REQUESTED)).isEmpty(); } @Test - default void awaitShouldNotThrowWhenCompletedTask() throws TaskManager.ReachedTimeoutException { + default void awaitShouldNotThrowWhenCompletedTask() throws Exception { TaskManager taskManager = taskManager(); TaskId taskId = taskManager.submit(new CompletedTask()); taskManager.await(taskId, TIMEOUT); @@ -516,7 +516,7 @@ public interface TaskManagerContract { } @Test - default void awaitShouldAwaitWaitingTask() throws TaskManager.ReachedTimeoutException, InterruptedException { + default void awaitShouldAwaitWaitingTask() throws Exception { TaskManager taskManager = taskManager(); CountDownLatch latch = new CountDownLatch(1); taskManager.submit(new MemoryReferenceTask( @@ -537,7 +537,7 @@ public interface TaskManagerContract { } @Test - default void awaitWithATooShortTimeoutShouldReturnATimeoutAwaitedTaskExecutionDetailsAndThrow() { + default void awaitWithATooShortTimeoutShouldReturnATimeoutAwaitedTaskExecutionDetailsAndThrow() throws Exception { TaskManager taskManager = taskManager(); TaskId taskId = taskManager.submit(new MemoryReferenceTask( () -> { @@ -550,7 +550,7 @@ public interface TaskManagerContract { } @Test - default void submittedTaskShouldExecuteSequentially() { + default void submittedTaskShouldExecuteSequentially() throws Exception { TaskManager taskManager = taskManager(); ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>(); @@ -580,7 +580,7 @@ public interface TaskManagerContract { } @Test - default void awaitShouldReturnFailedWhenExceptionThrown() { + default void awaitShouldReturnFailedWhenExceptionThrown() throws Exception { TaskManager taskManager = taskManager(); TaskId taskId = taskManager.submit(new ThrowingTask()); awaitUntilTaskHasStatus(taskId, TaskManager.Status.FAILED, taskManager); @@ -589,7 +589,7 @@ public interface TaskManagerContract { } @Test - default void getStatusShouldReturnFailedWhenExceptionThrown() { + default void getStatusShouldReturnFailedWhenExceptionThrown() throws Exception { TaskManager taskManager = taskManager(); TaskId taskId = taskManager.submit(new ThrowingTask()); awaitUntilTaskHasStatus(taskId, TaskManager.Status.FAILED, taskManager); diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java index 212dce28c5..fdc24021cd 100644 --- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java +++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java @@ -32,6 +32,8 @@ import java.util.Optional; import javax.annotation.PreDestroy; import javax.inject.Inject; +import org.apache.james.backends.rabbitmq.QueueArguments; +import org.apache.james.backends.rabbitmq.RabbitMQConfiguration; import org.apache.james.backends.rabbitmq.ReceiverProvider; import org.apache.james.eventsourcing.Event; import org.apache.james.eventsourcing.eventstore.cassandra.JsonEventSerializer; @@ -66,22 +68,26 @@ public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Sta private final JsonEventSerializer serializer; private final Sender sender; private final ReceiverProvider receiverProvider; + private final RabbitMQConfiguration rabbitMQConfiguration; private Sinks.Many<OutboundMessage> sendQueue; private Sinks.Many<Event> listener; private Disposable sendQueueHandle; private Disposable listenQueueHandle; @Inject - RabbitMQTerminationSubscriber(TerminationQueueName queueName, Sender sender, ReceiverProvider receiverProvider, JsonEventSerializer serializer) { + RabbitMQTerminationSubscriber(TerminationQueueName queueName, Sender sender, ReceiverProvider receiverProvider, JsonEventSerializer serializer, RabbitMQConfiguration rabbitMQConfiguration) { this.queueName = queueName; this.sender = sender; this.receiverProvider = receiverProvider; this.serializer = serializer; + this.rabbitMQConfiguration = rabbitMQConfiguration; } public void start() { sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block(); - sender.declare(QueueSpecification.queue(queueName.asString()).durable(!DURABLE).autoDelete(AUTO_DELETE)).block(); + QueueArguments.Builder builder = QueueArguments.builder(); + rabbitMQConfiguration.getQueueTTL().ifPresent(builder::queueTTL); + sender.declare(QueueSpecification.queue(queueName.asString()).durable(!DURABLE).autoDelete(!AUTO_DELETE).arguments(builder.build())).block(); sender.bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, queueName.asString())).block(); sendQueue = Sinks.many().unicast().onBackpressureBuffer(); sendQueueHandle = sender diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java index d49645d0c2..b315e9bec5 100644 --- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java +++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java @@ -32,6 +32,8 @@ import java.time.Duration; import java.util.Optional; import org.apache.james.backends.rabbitmq.Constants; +import org.apache.james.backends.rabbitmq.QueueArguments; +import org.apache.james.backends.rabbitmq.RabbitMQConfiguration; import org.apache.james.backends.rabbitmq.ReceiverProvider; import org.apache.james.server.task.json.JsonTaskSerializer; import org.apache.james.task.Task; @@ -81,6 +83,7 @@ public class RabbitMQWorkQueue implements WorkQueue { private final TaskManagerWorker worker; private final JsonTaskSerializer taskSerializer; private final RabbitMQWorkQueueConfiguration configuration; + private final RabbitMQConfiguration rabbitMQConfiguration; private final Sender sender; private final ReceiverProvider receiverProvider; private final CancelRequestQueueName cancelRequestQueueName; @@ -91,13 +94,15 @@ public class RabbitMQWorkQueue implements WorkQueue { public RabbitMQWorkQueue(TaskManagerWorker worker, Sender sender, ReceiverProvider receiverProvider, JsonTaskSerializer taskSerializer, - RabbitMQWorkQueueConfiguration configuration, CancelRequestQueueName cancelRequestQueueName) { + RabbitMQWorkQueueConfiguration configuration, CancelRequestQueueName cancelRequestQueueName, + RabbitMQConfiguration rabbitMQConfiguration) { this.cancelRequestQueueName = cancelRequestQueueName; this.worker = worker; this.receiverProvider = receiverProvider; this.sender = sender; this.taskSerializer = taskSerializer; this.configuration = configuration; + this.rabbitMQConfiguration = rabbitMQConfiguration; } @Override @@ -193,11 +198,17 @@ public class RabbitMQWorkQueue implements WorkQueue { private void listenToCancelRequests() { sender.declareExchange(ExchangeSpecification.exchange(CANCEL_REQUESTS_EXCHANGE_NAME)).block(); - sender.declare(QueueSpecification.queue(cancelRequestQueueName.asString()).durable(!DURABLE).autoDelete(AUTO_DELETE)).block(); + QueueArguments.Builder builder = QueueArguments.builder(); + rabbitMQConfiguration.getQueueTTL().ifPresent(builder::queueTTL); + QueueSpecification specification = QueueSpecification.queue(cancelRequestQueueName.asString()) + .durable(!DURABLE) + .autoDelete(AUTO_DELETE) + .arguments(builder.build()); + sender.declare(specification).block(); sender.bind(BindingSpecification.binding(CANCEL_REQUESTS_EXCHANGE_NAME, CANCEL_REQUESTS_ROUTING_KEY, cancelRequestQueueName.asString())).block(); registerCancelRequestsListener(cancelRequestQueueName.asString()); - sendCancelRequestsQueue = Sinks.many().unicast().onBackpressureBuffer(); + sendCancelRequestsQueue = Sinks.many().multicast().onBackpressureBuffer(); sendCancelRequestsQueueHandle = sender .send(sendCancelRequestsQueue.asFlux().map(this::makeCancelRequestMessage)) .subscribeOn(Schedulers.boundedElastic()) diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueReconnectionHandler.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueReconnectionHandler.java index 95633fb89a..6ad3f1def9 100644 --- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueReconnectionHandler.java +++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueReconnectionHandler.java @@ -26,13 +26,14 @@ import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE; import javax.inject.Inject; +import org.apache.james.backends.rabbitmq.QueueArguments; +import org.apache.james.backends.rabbitmq.RabbitMQConfiguration; import org.apache.james.backends.rabbitmq.SimpleConnectionPool; import org.apache.james.task.eventsourcing.EventSourcingTaskManager; import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.ImmutableMap; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; @@ -42,11 +43,13 @@ public class RabbitMQWorkQueueReconnectionHandler implements SimpleConnectionPoo private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQWorkQueueReconnectionHandler.class); private final CancelRequestQueueName cancelRequestQueueName; private final EventSourcingTaskManager taskManager; + private final RabbitMQConfiguration configuration; @Inject - public RabbitMQWorkQueueReconnectionHandler(CancelRequestQueueName cancelRequestQueueName, EventSourcingTaskManager taskManager) { + public RabbitMQWorkQueueReconnectionHandler(CancelRequestQueueName cancelRequestQueueName, EventSourcingTaskManager taskManager, RabbitMQConfiguration configuration) { this.cancelRequestQueueName = cancelRequestQueueName; this.taskManager = taskManager; + this.configuration = configuration; } @Override @@ -57,7 +60,9 @@ public class RabbitMQWorkQueueReconnectionHandler implements SimpleConnectionPoo private void createCancelQueue(Connection connection) { try (Channel channel = connection.createChannel()) { - channel.queueDeclare(cancelRequestQueueName.asString(), !DURABLE, !EXCLUSIVE, AUTO_DELETE, ImmutableMap.of()); + QueueArguments.Builder builder = QueueArguments.builder(); + configuration.getQueueTTL().ifPresent(builder::queueTTL); + channel.queueDeclare(cancelRequestQueueName.asString(), !DURABLE, !EXCLUSIVE, !AUTO_DELETE, builder.build()); } catch (Exception e) { LOGGER.error("Error recovering connection", e); } diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TerminationReconnectionHandler.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TerminationReconnectionHandler.java index 224d2a7f68..f1fc733057 100644 --- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TerminationReconnectionHandler.java +++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TerminationReconnectionHandler.java @@ -26,12 +26,13 @@ import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE; import javax.inject.Inject; +import org.apache.james.backends.rabbitmq.QueueArguments; +import org.apache.james.backends.rabbitmq.RabbitMQConfiguration; import org.apache.james.backends.rabbitmq.SimpleConnectionPool; import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.ImmutableMap; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; @@ -42,11 +43,13 @@ public class TerminationReconnectionHandler implements SimpleConnectionPool.Reco private final TerminationQueueName queueName; private final RabbitMQTerminationSubscriber terminationSubscriber; + private final RabbitMQConfiguration configuration; @Inject - public TerminationReconnectionHandler(TerminationQueueName queueName, RabbitMQTerminationSubscriber terminationSubscriber) { + public TerminationReconnectionHandler(TerminationQueueName queueName, RabbitMQTerminationSubscriber terminationSubscriber, RabbitMQConfiguration configuration) { this.queueName = queueName; this.terminationSubscriber = terminationSubscriber; + this.configuration = configuration; } @Override @@ -57,7 +60,9 @@ public class TerminationReconnectionHandler implements SimpleConnectionPool.Reco private void createTerminationQueue(Connection connection) { try (Channel channel = connection.createChannel()) { - channel.queueDeclare(queueName.asString(), !DURABLE, !EXCLUSIVE, AUTO_DELETE, ImmutableMap.of()); + QueueArguments.Builder builder = QueueArguments.builder(); + configuration.getQueueTTL().ifPresent(builder::queueTTL); + channel.queueDeclare(queueName.asString(), !DURABLE, !EXCLUSIVE, !AUTO_DELETE, builder.build()); } catch (Exception e) { LOGGER.error("Error recovering connection", e); } diff --git a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala index 92dd8a4634..7c5d542295 100644 --- a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala +++ b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala @@ -22,7 +22,7 @@ import java.time.Duration import com.google.common.annotations.VisibleForTesting import javax.inject.Inject -import org.apache.james.backends.rabbitmq.ReceiverProvider +import org.apache.james.backends.rabbitmq.{RabbitMQConfiguration, ReceiverProvider} import org.apache.james.eventsourcing.EventSourcingSystem import org.apache.james.server.task.json.JsonTaskSerializer import org.apache.james.task.SerialTaskManagerWorker @@ -33,7 +33,8 @@ class RabbitMQWorkQueueSupplier @Inject()(private val sender: Sender, private val receiverProvider: ReceiverProvider, private val jsonTaskSerializer: JsonTaskSerializer, private val cancelRequestName: CancelRequestQueueName, - private val configuration: RabbitMQWorkQueueConfiguration) extends WorkQueueSupplier { + private val configuration: RabbitMQWorkQueueConfiguration, + private val rabbitMQConfiguration: RabbitMQConfiguration) extends WorkQueueSupplier { val DEFAULT_ADDITIONAL_INFORMATION_POLLING_INTERVAL = Duration.ofSeconds(30) override def apply(eventSourcingSystem: EventSourcingSystem): RabbitMQWorkQueue = { @@ -44,7 +45,7 @@ class RabbitMQWorkQueueSupplier @Inject()(private val sender: Sender, def apply(eventSourcingSystem: EventSourcingSystem, additionalInformationPollingInterval: Duration): RabbitMQWorkQueue = { val listener = WorkerStatusListener(eventSourcingSystem) val worker = new SerialTaskManagerWorker(listener, additionalInformationPollingInterval) - val rabbitMQWorkQueue = new RabbitMQWorkQueue(worker, sender, receiverProvider, jsonTaskSerializer, configuration, cancelRequestName) + val rabbitMQWorkQueue = new RabbitMQWorkQueue(worker, sender, receiverProvider, jsonTaskSerializer, configuration, cancelRequestName, rabbitMQConfiguration) rabbitMQWorkQueue } } diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java index 3d99530eb6..f7ac3131b5 100644 --- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java +++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java @@ -31,6 +31,7 @@ import static org.awaitility.Awaitility.await; import static org.awaitility.Durations.FIVE_SECONDS; import static org.awaitility.Durations.ONE_SECOND; +import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; @@ -118,9 +119,9 @@ class DistributedTaskManagerTest implements TaskManagerContract { private final List<RabbitMQWorkQueue> workQueues; private final RabbitMQWorkQueueSupplier supplier; - TrackedRabbitMQWorkQueueSupplier(Sender sender, ReceiverProvider receiverProvider, JsonTaskSerializer taskSerializer) { + TrackedRabbitMQWorkQueueSupplier(Sender sender, ReceiverProvider receiverProvider, JsonTaskSerializer taskSerializer) throws Exception { workQueues = new ArrayList<>(); - supplier = new RabbitMQWorkQueueSupplier(sender, receiverProvider, taskSerializer, CancelRequestQueueName.generate(), RabbitMQWorkQueueConfiguration$.MODULE$.enabled()); + supplier = new RabbitMQWorkQueueSupplier(sender, receiverProvider, taskSerializer, CancelRequestQueueName.generate(), RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), rabbitMQExtension.getRabbitMQ().getConfiguration()); } @Override @@ -198,7 +199,7 @@ class DistributedTaskManagerTest implements TaskManagerContract { JsonEventSerializer eventSerializer; @BeforeEach - void setUp(EventStore eventStore) { + void setUp(EventStore eventStore) throws Exception { memoryReferenceTaskStore = new MemoryReferenceTaskStore(); memoryReferenceWithCounterTaskStore = new MemoryReferenceWithCounterTaskStore(); CassandraCluster cassandra = CASSANDRA_CLUSTER.getCassandraCluster(); @@ -223,21 +224,21 @@ class DistributedTaskManagerTest implements TaskManagerContract { .forEach(queue -> managementAPI.deleteQueue("/", queue.getName())); } - public EventSourcingTaskManager taskManager() { + public EventSourcingTaskManager taskManager() throws Exception { return taskManager(HOSTNAME); } - EventSourcingTaskManager taskManager(Hostname hostname) { + EventSourcingTaskManager taskManager(Hostname hostname) throws Exception { RabbitMQTerminationSubscriber terminationSubscriber = new RabbitMQTerminationSubscriber(TerminationQueueName.generate(), rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), - eventSerializer); + eventSerializer, rabbitMQExtension.getRabbitMQ().getConfiguration()); terminationSubscribers.add(terminationSubscriber); terminationSubscriber.start(); return new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection, hostname, terminationSubscriber); } @Test - void badPayloadShouldNotAffectTaskManagerOnCancelTask() throws TaskManager.ReachedTimeoutException { + void badPayloadShouldNotAffectTaskManagerOnCancelTask() throws Exception { TaskManager taskManager = taskManager(HOSTNAME); TaskId id = taskManager.submit(new MemoryReferenceTask(() -> { Thread.sleep(250); @@ -255,7 +256,7 @@ class DistributedTaskManagerTest implements TaskManagerContract { } @Test - void badPayloadsShouldNotAffectTaskManagerOnCancelTask() throws TaskManager.ReachedTimeoutException { + void badPayloadsShouldNotAffectTaskManagerOnCancelTask() throws Exception { TaskManager taskManager = taskManager(HOSTNAME); TaskId id = taskManager.submit(new MemoryReferenceTask(() -> { Thread.sleep(250); @@ -274,7 +275,7 @@ class DistributedTaskManagerTest implements TaskManagerContract { } @Test - void badPayloadShouldNotAffectTaskManagerOnCompleteTask() throws TaskManager.ReachedTimeoutException { + void badPayloadShouldNotAffectTaskManagerOnCompleteTask() throws Exception { TaskManager taskManager = taskManager(HOSTNAME); TaskId id = taskManager.submit(new MemoryReferenceTask(() -> { Thread.sleep(250); @@ -291,7 +292,7 @@ class DistributedTaskManagerTest implements TaskManagerContract { } @Test - void badPayloadsShouldNotAffectTaskManagerOnCompleteTask() throws TaskManager.ReachedTimeoutException { + void badPayloadsShouldNotAffectTaskManagerOnCompleteTask() throws Exception { TaskManager taskManager = taskManager(HOSTNAME); TaskId id = taskManager.submit(new MemoryReferenceTask(() -> { Thread.sleep(250); @@ -309,7 +310,7 @@ class DistributedTaskManagerTest implements TaskManagerContract { } @Test - void givenOneEventStoreTwoEventTaskManagersShareTheSameEvents() { + void givenOneEventStoreTwoEventTaskManagersShareTheSameEvents() throws Exception { try (EventSourcingTaskManager taskManager1 = taskManager(); EventSourcingTaskManager taskManager2 = taskManager(HOSTNAME_2)) { TaskId taskId = taskManager1.submit(new CompletedTask()); @@ -325,7 +326,7 @@ class DistributedTaskManagerTest implements TaskManagerContract { } @Test - void givenTwoTaskManagersAndTwoTaskOnlyOneTaskShouldRunAtTheSameTime() { + void givenTwoTaskManagersAndTwoTaskOnlyOneTaskShouldRunAtTheSameTime() throws Exception { CountDownLatch waitingForFirstTaskLatch = new CountDownLatch(1); try (EventSourcingTaskManager taskManager1 = taskManager(); @@ -348,7 +349,7 @@ class DistributedTaskManagerTest implements TaskManagerContract { } @Test - void givenTwoTaskManagerATaskSubmittedOnOneCouldBeRunOnTheOther() throws InterruptedException { + void givenTwoTaskManagerATaskSubmittedOnOneCouldBeRunOnTheOther() throws Exception { try (EventSourcingTaskManager taskManager1 = taskManager()) { Thread.sleep(100); try (EventSourcingTaskManager taskManager2 = taskManager(HOSTNAME_2)) { @@ -368,7 +369,7 @@ class DistributedTaskManagerTest implements TaskManagerContract { } @Test - void givenTwoTaskManagerATaskRunningOnOneShouldBeCancellableFromTheOtherOne(CountDownLatch countDownLatch) { + void givenTwoTaskManagerATaskRunningOnOneShouldBeCancellableFromTheOtherOne(CountDownLatch countDownLatch) throws Exception { TaskManager taskManager1 = taskManager(HOSTNAME); TaskManager taskManager2 = taskManager(HOSTNAME_2); TaskId id = taskManager1.submit(new MemoryReferenceTask(() -> { @@ -397,7 +398,7 @@ class DistributedTaskManagerTest implements TaskManagerContract { } @Test - void givenTwoTaskManagersATaskRunningOnOneShouldBeWaitableFromTheOtherOne() throws TaskManager.ReachedTimeoutException { + void givenTwoTaskManagersATaskRunningOnOneShouldBeWaitableFromTheOtherOne() throws Exception { TaskManager taskManager1 = taskManager(HOSTNAME); TaskManager taskManager2 = taskManager(HOSTNAME_2); TaskId id = taskManager1.submit(new MemoryReferenceTask(() -> { @@ -424,7 +425,7 @@ class DistributedTaskManagerTest implements TaskManagerContract { } @Test - void givenTwoTaskManagerAndATaskRanPerTaskManagerListingThemOnEachShouldShowBothTasks() { + void givenTwoTaskManagerAndATaskRanPerTaskManagerListingThemOnEachShouldShowBothTasks() throws Exception { try (EventSourcingTaskManager taskManager1 = taskManager(); EventSourcingTaskManager taskManager2 = taskManager(HOSTNAME_2)) { @@ -449,7 +450,7 @@ class DistributedTaskManagerTest implements TaskManagerContract { } @Test - void givenTwoTaskManagerIfTheFirstOneIsDownTheSecondOneShouldBeAbleToRunTheRemainingTasks(CountDownLatch countDownLatch) { + void givenTwoTaskManagerIfTheFirstOneIsDownTheSecondOneShouldBeAbleToRunTheRemainingTasks(CountDownLatch countDownLatch) throws Exception { try (EventSourcingTaskManager taskManager1 = taskManager(); EventSourcingTaskManager taskManager2 = taskManager(HOSTNAME_2)) { ImmutableBiMap<EventSourcingTaskManager, Hostname> hostNameByTaskManager = ImmutableBiMap.of(taskManager1, HOSTNAME, taskManager2, HOSTNAME_2); @@ -477,7 +478,7 @@ class DistributedTaskManagerTest implements TaskManagerContract { } @Test - void shouldNotCrashWhenBadMessage() { + void shouldNotCrashWhenBadMessage() throws Exception { TaskManager taskManager = taskManager(HOSTNAME); taskManager.submit(new FailsDeserializationTask()); @@ -488,7 +489,7 @@ class DistributedTaskManagerTest implements TaskManagerContract { } @Test - void shouldNotCrashWhenBadMessages() { + void shouldNotCrashWhenBadMessages() throws Exception { TaskManager taskManager = taskManager(HOSTNAME); IntStream.range(0, 100).forEach(i -> taskManager.submit(new FailsDeserializationTask())); @@ -586,7 +587,7 @@ class DistributedTaskManagerTest implements TaskManagerContract { } @Test - void cassandraTasksShouldBeCancealable(CassandraCluster cassandra) { + void cassandraTasksShouldBeCancealable(CassandraCluster cassandra) throws Exception { TaskManager taskManager = taskManager(HOSTNAME); TaskId taskId = taskManager.submit(new CassandraExecutingTask(cassandra.getConf(), true)); @@ -599,7 +600,7 @@ class DistributedTaskManagerTest implements TaskManagerContract { } @Test - void inProgressTaskShouldBeCanceledWhenCloseTaskManager() { + void inProgressTaskShouldBeCanceledWhenCloseTaskManager() throws Exception { try (EventSourcingTaskManager taskManager = taskManager()) { TaskId taskId = taskManager.submit(new MemoryReferenceTask(() -> { TimeUnit.SECONDS.sleep(5); diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java index 2f53672adb..f1a868f97f 100644 --- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java +++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java @@ -65,15 +65,15 @@ class RabbitMQTerminationSubscriberTest implements TerminationSubscriberContract .isolationPolicy(WEAK); @Override - public TerminationSubscriber subscriber() { + public TerminationSubscriber subscriber() throws Exception { RabbitMQTerminationSubscriber subscriber = new RabbitMQTerminationSubscriber(TerminationQueueName.generate(), rabbitMQExtension.getSender(), - rabbitMQExtension.getReceiverProvider(), SERIALIZER); + rabbitMQExtension.getReceiverProvider(), SERIALIZER, rabbitMQExtension.getRabbitMQ().getConfiguration()); subscriber.start(); return subscriber; } @Test - void givenTwoTerminationSubscribersWhenAnEventIsSentItShouldBeReceivedByBoth() { + void givenTwoTerminationSubscribersWhenAnEventIsSentItShouldBeReceivedByBoth() throws Exception { TerminationSubscriber subscriber1 = subscriber(); TerminationSubscriber subscriber2 = subscriber(); @@ -94,7 +94,7 @@ class RabbitMQTerminationSubscriberTest implements TerminationSubscriberContract } @Test - void eventProcessingShouldNotCrashOnInvalidMessage() { + void eventProcessingShouldNotCrashOnInvalidMessage() throws Exception { TerminationSubscriber subscriber1 = subscriber(); Flux<Event> firstListener = Flux.from(subscriber1.listenEvents()); @@ -113,7 +113,7 @@ class RabbitMQTerminationSubscriberTest implements TerminationSubscriberContract } @Test - void eventProcessingShouldNotCrashOnInvalidMessages() { + void eventProcessingShouldNotCrashOnInvalidMessages() throws Exception { TerminationSubscriber subscriber1 = subscriber(); Flux<Event> firstListener = Flux.from(subscriber1.listenEvents()); diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java index 6a964af193..81b56e5bdb 100644 --- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java +++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueuePersistenceTest.java @@ -50,10 +50,10 @@ class RabbitMQWorkQueuePersistenceTest { private JsonTaskSerializer serializer; @BeforeEach - void setUp() { + void setUp() throws Exception { worker = spy(new ImmediateWorker()); serializer = JsonTaskSerializer.of(TestTaskDTOModules.COMPLETED_TASK_MODULE, TestTaskDTOModules.MEMORY_REFERENCE_TASK_MODULE.apply(new MemoryReferenceTaskStore())); - testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer, RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate()); + testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer, RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate(), rabbitMQExtension.getRabbitMQ().getConfiguration()); //declare the queue but do not start consuming from it testee.declareQueue(); } @@ -90,9 +90,9 @@ class RabbitMQWorkQueuePersistenceTest { assertThat(worker.results).containsExactly(Task.Result.COMPLETED); } - private void startNewConsumingWorkqueue() { + private void startNewConsumingWorkqueue() throws Exception { worker = spy(new ImmediateWorker()); - testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer, RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate()); + testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer, RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate(), rabbitMQExtension.getRabbitMQ().getConfiguration()); testee.start(); } } diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java index 8f83aa5a95..45265689b0 100644 --- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java +++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java @@ -66,10 +66,10 @@ class RabbitMQWorkQueueTest { @BeforeEach - void setUp() { + void setUp() throws Exception { worker = new ImmediateWorker(); serializer = JsonTaskSerializer.of(TestTaskDTOModules.COMPLETED_TASK_MODULE, TestTaskDTOModules.MEMORY_REFERENCE_TASK_MODULE.apply(new MemoryReferenceTaskStore())); - testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer, RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate()); + testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer, RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate(), rabbitMQExtension.getRabbitMQ().getConfiguration()); testee.start(); } @@ -96,11 +96,11 @@ class RabbitMQWorkQueueTest { } @Test - void givenTwoWorkQueuesOnlyTheFirstOneIsConsumingTasks() { + void givenTwoWorkQueuesOnlyTheFirstOneIsConsumingTasks() throws Exception { testee.submit(TASK_WITH_ID); ImmediateWorker otherTaskManagerWorker = new ImmediateWorker(); - try (RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer, RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate())) { + try (RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer, RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate(), rabbitMQExtension.getRabbitMQ().getConfiguration())) { otherWorkQueue.start(); IntStream.range(0, 9) @@ -112,7 +112,7 @@ class RabbitMQWorkQueueTest { } @Test - void givenANonDeserializableTaskItShouldBeFlaggedAsFailedAndItDoesNotPreventFollowingTasks() throws InterruptedException { + void givenANonDeserializableTaskItShouldBeFlaggedAsFailedAndItDoesNotPreventFollowingTasks() throws Exception { Task task = new TestTask(42); TaskId taskId = TaskId.fromString("4bf6d081-aa30-11e9-bf6c-2d3b9e84aafd"); TaskWithId taskWithId = new TaskWithId(taskId, task); @@ -120,7 +120,7 @@ class RabbitMQWorkQueueTest { ImmediateWorker otherTaskManagerWorker = new ImmediateWorker(); JsonTaskSerializer otherTaskSerializer = JsonTaskSerializer.of(TestTaskDTOModules.TEST_TYPE); try (RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), otherTaskSerializer, - RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate())) { + RabbitMQWorkQueueConfiguration$.MODULE$.enabled(), CancelRequestQueueName.generate(), rabbitMQExtension.getRabbitMQ().getConfiguration())) { //wait to be sur that the first workqueue has subscribed as an exclusive consumer of the RabbitMQ queue. Thread.sleep(200); otherWorkQueue.start(); diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java index 63d9625c8e..1a2c801292 100644 --- a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java +++ b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java @@ -53,10 +53,10 @@ public interface TerminationSubscriberContract { Duration DELAY_BEFORE_PUBLISHING = Duration.ofMillis(50); ExecutorService EXECUTOR = Executors.newCachedThreadPool(); - TerminationSubscriber subscriber(); + TerminationSubscriber subscriber() throws Exception; @Test - default void handlingCompletedShouldBeListed() { + default void handlingCompletedShouldBeListed() throws Exception { TerminationSubscriber subscriber = subscriber(); sendEvents(subscriber, COMPLETED_EVENT); @@ -65,7 +65,7 @@ public interface TerminationSubscriberContract { } @Test - default void handlingFailedShouldBeListed() { + default void handlingFailedShouldBeListed() throws Exception { TerminationSubscriber subscriber = subscriber(); sendEvents(subscriber, FAILED_EVENT); @@ -74,7 +74,7 @@ public interface TerminationSubscriberContract { } @Test - default void handlingCancelledShouldBeListed() { + default void handlingCancelledShouldBeListed() throws Exception { TerminationSubscriber subscriber = subscriber(); sendEvents(subscriber, CANCELLED_EVENT); @@ -83,7 +83,7 @@ public interface TerminationSubscriberContract { } @Test - default void handlingNonTerminalEventShouldNotBeListed() { + default void handlingNonTerminalEventShouldNotBeListed() throws Exception { TerminationSubscriber subscriber = subscriber(); TaskEvent event = new Started(new TaskAggregateId(TaskId.generateTaskId()), EventId.fromSerialized(42), new Hostname("foo")); @@ -93,7 +93,7 @@ public interface TerminationSubscriberContract { } @Test - default void handlingMultipleEventsShouldBeListed() { + default void handlingMultipleEventsShouldBeListed() throws Exception { TerminationSubscriber subscriber = subscriber(); sendEvents(subscriber, COMPLETED_EVENT, FAILED_EVENT, CANCELLED_EVENT); @@ -102,7 +102,7 @@ public interface TerminationSubscriberContract { } @Test - default void multipleListeningEventsShouldShareEvents() { + default void multipleListeningEventsShouldShareEvents() throws Exception { TerminationSubscriber subscriber = subscriber(); Flux<Event> firstListener = Flux.from(subscriber.listenEvents()); @@ -122,7 +122,7 @@ public interface TerminationSubscriberContract { } @Test - default void dynamicListeningEventsShouldGetOnlyNewEvents() { + default void dynamicListeningEventsShouldGetOnlyNewEvents() throws Exception { TerminationSubscriber subscriber = subscriber(); sendEvents(subscriber, COMPLETED_EVENT, FAILED_EVENT, CANCELLED_EVENT); --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org