This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push: new e74439a69d [FIX] Improve DistributedTaskManagerTest stability (#1539) e74439a69d is described below commit e74439a69d475da6027e25db2697a23e413d52ac Author: Benoit TELLIER <btell...@linagora.com> AuthorDate: Tue Apr 25 22:02:22 2023 +0700 [FIX] Improve DistributedTaskManagerTest stability (#1539) - Avoid sleeps and use count down latches/awaits --- .../distributed/DistributedTaskManagerTest.java | 27 +++++++++++++++++----- .../task/eventsourcing/DecisionProjection.scala | 2 +- 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java index f7ac3131b5..c688dfc000 100644 --- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java +++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java @@ -240,8 +240,9 @@ class DistributedTaskManagerTest implements TaskManagerContract { @Test void badPayloadShouldNotAffectTaskManagerOnCancelTask() throws Exception { TaskManager taskManager = taskManager(HOSTNAME); + CountDownLatch latch = new CountDownLatch(1); TaskId id = taskManager.submit(new MemoryReferenceTask(() -> { - Thread.sleep(250); + latch.await(); return Task.Result.COMPLETED; })); @@ -250,6 +251,9 @@ class DistributedTaskManagerTest implements TaskManagerContract { .block(); taskManager.cancel(id); + Awaitility.await() + .until(() -> taskManager.getExecutionDetails(id).getStatus().equals(CANCELLED)); + latch.countDown(); taskManager.await(id, TIMEOUT); assertThat(taskManager.getExecutionDetails(id).getStatus()) .isEqualTo(CANCELLED); @@ -258,8 +262,9 @@ class DistributedTaskManagerTest implements TaskManagerContract { @Test void badPayloadsShouldNotAffectTaskManagerOnCancelTask() throws Exception { TaskManager taskManager = taskManager(HOSTNAME); + CountDownLatch latch = new CountDownLatch(1); TaskId id = taskManager.submit(new MemoryReferenceTask(() -> { - Thread.sleep(250); + latch.await(); return Task.Result.COMPLETED; })); @@ -269,6 +274,9 @@ class DistributedTaskManagerTest implements TaskManagerContract { .block()); taskManager.cancel(id); + Awaitility.await() + .until(() -> taskManager.getExecutionDetails(id).getStatus().equals(CANCELLED)); + latch.countDown(); taskManager.await(id, TIMEOUT); assertThat(taskManager.getExecutionDetails(id).getStatus()) .isEqualTo(CANCELLED); @@ -277,8 +285,9 @@ class DistributedTaskManagerTest implements TaskManagerContract { @Test void badPayloadShouldNotAffectTaskManagerOnCompleteTask() throws Exception { TaskManager taskManager = taskManager(HOSTNAME); + CountDownLatch latch = new CountDownLatch(1); TaskId id = taskManager.submit(new MemoryReferenceTask(() -> { - Thread.sleep(250); + latch.await(); return Task.Result.COMPLETED; })); @@ -286,6 +295,7 @@ class DistributedTaskManagerTest implements TaskManagerContract { .send(Mono.just(new OutboundMessage(EXCHANGE_NAME, ROUTING_KEY, BAD_PAYLOAD))) .block(); + latch.countDown(); taskManager.await(id, TIMEOUT); assertThat(taskManager.getExecutionDetails(id).getStatus()) .isEqualTo(TaskManager.Status.COMPLETED); @@ -294,8 +304,9 @@ class DistributedTaskManagerTest implements TaskManagerContract { @Test void badPayloadsShouldNotAffectTaskManagerOnCompleteTask() throws Exception { TaskManager taskManager = taskManager(HOSTNAME); + CountDownLatch latch = new CountDownLatch(1); TaskId id = taskManager.submit(new MemoryReferenceTask(() -> { - Thread.sleep(250); + latch.await(); return Task.Result.COMPLETED; })); @@ -304,6 +315,7 @@ class DistributedTaskManagerTest implements TaskManagerContract { .send(Mono.just(new OutboundMessage(EXCHANGE_NAME, ROUTING_KEY, BAD_PAYLOAD))) .block()); + latch.countDown(); taskManager.await(id, TIMEOUT); assertThat(taskManager.getExecutionDetails(id).getStatus()) .isEqualTo(TaskManager.Status.COMPLETED); @@ -401,8 +413,9 @@ class DistributedTaskManagerTest implements TaskManagerContract { void givenTwoTaskManagersATaskRunningOnOneShouldBeWaitableFromTheOtherOne() throws Exception { TaskManager taskManager1 = taskManager(HOSTNAME); TaskManager taskManager2 = taskManager(HOSTNAME_2); + CountDownLatch latch = new CountDownLatch(1); TaskId id = taskManager1.submit(new MemoryReferenceTask(() -> { - Thread.sleep(250); + latch.await(); return Task.Result.COMPLETED; })); @@ -411,6 +424,7 @@ class DistributedTaskManagerTest implements TaskManagerContract { TaskManager remoteTaskManager = getOtherTaskManager(runningNode, Pair.of(HOSTNAME, taskManager1), Pair.of(HOSTNAME_2, taskManager2)).getValue(); + latch.countDown(); remoteTaskManager.await(id, TIMEOUT); assertThat(taskManager1.getExecutionDetails(id).getStatus()) .isEqualTo(TaskManager.Status.COMPLETED); @@ -602,8 +616,9 @@ class DistributedTaskManagerTest implements TaskManagerContract { @Test void inProgressTaskShouldBeCanceledWhenCloseTaskManager() throws Exception { try (EventSourcingTaskManager taskManager = taskManager()) { + CountDownLatch latch = new CountDownLatch(1); TaskId taskId = taskManager.submit(new MemoryReferenceTask(() -> { - TimeUnit.SECONDS.sleep(5); + latch.await(); return Task.Result.COMPLETED; })); diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/DecisionProjection.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/DecisionProjection.scala index d42fb9a4ca..2d4b274529 100644 --- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/DecisionProjection.scala +++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/DecisionProjection.scala @@ -28,7 +28,7 @@ case class DecisionProjection(status: Status, latestUpdateAdditionalInformationU event match { case _: Created => this case _: Started => DecisionProjection(Status.IN_PROGRESS, None) - case _: CancelRequested => DecisionProjection(Status.CANCEL_REQUESTED,latestUpdateAdditionalInformationUpdate) + case _: CancelRequested => DecisionProjection(Status.CANCEL_REQUESTED, latestUpdateAdditionalInformationUpdate) case event: Cancelled => DecisionProjection(Status.CANCELLED, event.additionalInformation.map(_.timestamp)) case event: Completed => DecisionProjection(Status.COMPLETED, event.additionalInformation.map(_.timestamp)) case event: Failed => DecisionProjection(Status.FAILED, event.additionalInformation.map(_.timestamp)) --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org