This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit aebfaa90b906e656641f9348844187aa79e44d00 Author: LanKhuat <dlkh...@linagora.com> AuthorDate: Tue Jul 14 14:23:43 2020 +0700 JAMES-3305 Task manager deserialization error handling --- .../distributed/RabbitMQWorkQueue.java | 19 ++-- .../distributed/DistributedTaskManagerTest.java | 122 ++++++++++++++++++++- .../distributed/RabbitMQWorkQueueTest.java | 5 +- 3 files changed, 133 insertions(+), 13 deletions(-) diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java index f1daf1f..6f5e22e 100644 --- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java +++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java @@ -21,6 +21,7 @@ package org.apache.james.task.eventsourcing.distributed; import static com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN; +import static org.apache.james.backends.rabbitmq.Constants.REQUEUE; import java.nio.charset.StandardCharsets; import java.time.Duration; @@ -129,13 +130,15 @@ public class RabbitMQWorkQueue implements WorkQueue { } private Mono<Task.Result> executeTask(AcknowledgableDelivery delivery) { - TaskId taskId = TaskId.fromString(delivery.getProperties().getHeaders().get(TASK_ID).toString()); - return Mono.fromCallable(() -> { - delivery.ack(); - return new String(delivery.getBody(), StandardCharsets.UTF_8); - }).flatMap(json -> - deserialize(json, taskId) - .flatMap(task -> executeOnWorker(taskId, task))); + return Mono.fromCallable(() -> TaskId.fromString(delivery.getProperties().getHeaders().get(TASK_ID).toString())) + .flatMap(taskId -> deserialize(new String(delivery.getBody(), StandardCharsets.UTF_8), taskId) + .doOnNext(task -> delivery.ack()) + .flatMap(task -> executeOnWorker(taskId, task))) + .onErrorResume(error -> { + LOGGER.error("Unable to process {} {}", TASK_ID, delivery.getProperties().getHeaders().get(TASK_ID), error); + delivery.nack(!REQUEUE); + return Mono.empty(); + }); } private Mono<Task> deserialize(String json, TaskId taskId) { @@ -225,4 +228,4 @@ public class RabbitMQWorkQueue implements WorkQueue { Optional.ofNullable(cancelRequestListenerHandle).ifPresent(Disposable::dispose); Optional.ofNullable(cancelRequestListener).ifPresent(Receiver::close); } -} +} \ No newline at end of file diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java index db84283..37e4b7d 100644 --- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java +++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java @@ -19,17 +19,27 @@ package org.apache.james.task.eventsourcing.distributed; +import static com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN; +import static org.apache.james.backends.cassandra.Scenario.Builder.executeNormally; +import static org.apache.james.backends.cassandra.Scenario.Builder.fail; +import static org.apache.james.task.eventsourcing.distributed.RabbitMQWorkQueue.EXCHANGE_NAME; +import static org.apache.james.task.eventsourcing.distributed.RabbitMQWorkQueue.ROUTING_KEY; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.awaitility.Duration.FIVE_SECONDS; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; import org.apache.commons.lang3.tuple.Pair; import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.CassandraClusterExtension; +import org.apache.james.backends.cassandra.Scenario; import org.apache.james.backends.cassandra.components.CassandraModule; import org.apache.james.backends.cassandra.init.CassandraZonedDateTimeModule; import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule; @@ -56,6 +66,8 @@ import org.apache.james.server.task.json.dto.TaskDTOModule; import org.apache.james.server.task.json.dto.TestTaskDTOModules; import org.apache.james.task.CompletedTask; import org.apache.james.task.CountDownLatchExtension; +import org.apache.james.task.FailedTask; +import org.apache.james.task.FailsDeserializationTask; import org.apache.james.task.Hostname; import org.apache.james.task.MemoryReferenceTask; import org.apache.james.task.Task; @@ -63,6 +75,7 @@ import org.apache.james.task.TaskExecutionDetails; import org.apache.james.task.TaskId; import org.apache.james.task.TaskManager; import org.apache.james.task.TaskManagerContract; +import org.apache.james.task.TaskWithId; import org.apache.james.task.WorkQueue; import org.apache.james.task.eventsourcing.EventSourcingTaskManager; import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection; @@ -78,9 +91,13 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; +import com.rabbitmq.client.AMQP; +import reactor.core.publisher.Mono; +import reactor.rabbitmq.OutboundMessage; import reactor.rabbitmq.Sender; class DistributedTaskManagerTest implements TaskManagerContract { @@ -114,6 +131,9 @@ class DistributedTaskManagerTest implements TaskManagerContract { static final DTOConverter<TaskExecutionDetails.AdditionalInformation, AdditionalInformationDTO> TASK_ADDITIONAL_INFORMATION_DTO_CONVERTER = DTOConverter.of(ADDITIONAL_INFORMATION_MODULE); static final Hostname HOSTNAME = new Hostname("foo"); static final Hostname HOSTNAME_2 = new Hostname("bar"); + static final TaskId TASK_ID = TaskId.fromString("2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd"); + static final Task TASK = new CompletedTask(); + static final TaskWithId TASK_WITH_ID = new TaskWithId(TASK_ID, TASK); @RegisterExtension static final RabbitMQExtension rabbitMQExtension = RabbitMQExtension.singletonRabbitMQ(); @@ -132,6 +152,7 @@ class DistributedTaskManagerTest implements TaskManagerContract { ImmutableSet<TaskDTOModule<?, ?>> taskDTOModules = ImmutableSet.of( + TestTaskDTOModules.FAILS_DESERIALIZATION_TASK_MODULE, TestTaskDTOModules.COMPLETED_TASK_MODULE, TestTaskDTOModules.FAILED_TASK_MODULE, TestTaskDTOModules.THROWING_TASK_MODULE, @@ -337,7 +358,7 @@ class DistributedTaskManagerTest implements TaskManagerContract { } @Test - void givenTwoTaskManagerIfTheFirstOneIsDownTheSecondOneShouldBeAbleToRunTheRemainingTasks(CountDownLatch countDownLatch) throws Exception { + void givenTwoTaskManagerIfTheFirstOneIsDownTheSecondOneShouldBeAbleToRunTheRemainingTasks(CountDownLatch countDownLatch) { try (EventSourcingTaskManager taskManager1 = taskManager(); EventSourcingTaskManager taskManager2 = taskManager(HOSTNAME_2)) { ImmutableBiMap<EventSourcingTaskManager, Hostname> hostNameByTaskManager = ImmutableBiMap.of(taskManager1, HOSTNAME, taskManager2, HOSTNAME_2); @@ -364,6 +385,105 @@ class DistributedTaskManagerTest implements TaskManagerContract { } } + @Test + void shouldNotCrashWhenBadMessage() { + TaskManager taskManager = taskManager(HOSTNAME); + + taskManager.submit(new FailsDeserializationTask()); + + TaskId id = taskManager.submit(TASK); + + awaitUntilTaskHasStatus(id, TaskManager.Status.COMPLETED, taskManager); + } + + @Test + void shouldNotCrashWhenBadMessages() { + TaskManager taskManager = taskManager(HOSTNAME); + + IntStream.range(0, 100).forEach(i -> taskManager.submit(new FailsDeserializationTask())); + + TaskId id = taskManager.submit(TASK); + + awaitUntilTaskHasStatus(id, TaskManager.Status.COMPLETED, taskManager); + } + + @Test + void shouldNotCrashWhenInvalidHeader() throws Exception { + TaskManager taskManager = taskManager(HOSTNAME); + + AMQP.BasicProperties badProperties = new AMQP.BasicProperties.Builder() + .deliveryMode(PERSISTENT_TEXT_PLAIN.getDeliveryMode()) + .priority(PERSISTENT_TEXT_PLAIN.getPriority()) + .contentType(PERSISTENT_TEXT_PLAIN.getContentType()) + .headers(ImmutableMap.of("abc", TASK_WITH_ID.getId().asString())) + .build(); + + rabbitMQExtension.getSender() + .send(Mono.just(new OutboundMessage(EXCHANGE_NAME, + ROUTING_KEY, badProperties, taskSerializer.serialize(TASK_WITH_ID.getTask()).getBytes(StandardCharsets.UTF_8)))) + .block(); + + TaskId taskId = taskManager.submit(TASK); + + await().atMost(FIVE_SECONDS).until(() -> taskManager.list(TaskManager.Status.COMPLETED).size() == 1); + + assertThat(taskManager.getExecutionDetails(taskId).getStatus()) + .isEqualTo(TaskManager.Status.COMPLETED); + } + + @Test + void shouldNotCrashWhenInvalidTaskId() throws Exception { + TaskManager taskManager = taskManager(HOSTNAME); + + AMQP.BasicProperties badProperties = new AMQP.BasicProperties.Builder() + .deliveryMode(PERSISTENT_TEXT_PLAIN.getDeliveryMode()) + .priority(PERSISTENT_TEXT_PLAIN.getPriority()) + .contentType(PERSISTENT_TEXT_PLAIN.getContentType()) + .headers(ImmutableMap.of("taskId", "BAD_ID")) + .build(); + + rabbitMQExtension.getSender() + .send(Mono.just(new OutboundMessage(EXCHANGE_NAME, + ROUTING_KEY, badProperties, taskSerializer.serialize(TASK_WITH_ID.getTask()).getBytes(StandardCharsets.UTF_8)))) + .block(); + + TaskId taskId = taskManager.submit(TASK); + + await().atMost(FIVE_SECONDS).until(() -> taskManager.list(TaskManager.Status.COMPLETED).size() == 1); + + assertThat(taskManager.getExecutionDetails(taskId).getStatus()) + .isEqualTo(TaskManager.Status.COMPLETED); + } + + @Test + void shouldNotCrashWhenErrorHandlingFails(CassandraCluster cassandra) throws Exception { + TaskManager taskManager = taskManager(HOSTNAME); + + cassandra.getConf().printStatements(); + cassandra.getConf().registerScenario(Scenario.combine( + executeNormally() + .times(2) // submit + inProgress + .whenQueryStartsWith("INSERT INTO eventStore"), + executeNormally() + .times(2) // submit + inProgress + .whenQueryStartsWith("INSERT INTO taskExecutionDetailsProjection"), + fail() + .forever() + .whenQueryStartsWith("INSERT INTO eventStore"), + fail() + .forever() + .whenQueryStartsWith("INSERT INTO taskExecutionDetailsProjection"))); + taskManager.submit(new FailedTask()); + + Thread.sleep(1000); + + cassandra.getConf().registerScenario(Scenario.NOTHING); + + TaskId id2 = taskManager.submit(new CompletedTask()); + + awaitUntilTaskHasStatus(id2, TaskManager.Status.COMPLETED, taskManager); + } + private Hostname getOtherNode(ImmutableBiMap<EventSourcingTaskManager, Hostname> hostNameByTaskManager, Hostname node) { return hostNameByTaskManager .values() diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java index 4eb0323..5832863 100644 --- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java +++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java @@ -25,7 +25,6 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.awaitility.Awaitility.await; import static org.awaitility.Duration.FIVE_HUNDRED_MILLISECONDS; import static org.awaitility.Duration.TWO_SECONDS; -import static org.mockito.Mockito.spy; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.IntStream; @@ -67,7 +66,7 @@ class RabbitMQWorkQueueTest { @BeforeEach void setUp() { - worker = spy(new ImmediateWorker()); + worker = new ImmediateWorker(); serializer = JsonTaskSerializer.of(TestTaskDTOModules.COMPLETED_TASK_MODULE, TestTaskDTOModules.MEMORY_REFERENCE_TASK_MODULE.apply(new MemoryReferenceTaskStore())); testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), serializer); testee.start(); @@ -160,7 +159,5 @@ class RabbitMQWorkQueueTest { assertThatThrownBy(() -> await().atMost(FIVE_HUNDRED_MILLISECONDS).untilAtomic(counter, CoreMatchers.equalTo(3L))).isInstanceOf(ConditionTimeoutException.class); assertThatCode(() -> await().atMost(TWO_SECONDS).untilAtomic(counter, CoreMatchers.equalTo(3L))).doesNotThrowAnyException(); - } - } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org