This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit aebfaa90b906e656641f9348844187aa79e44d00
Author: LanKhuat <dlkh...@linagora.com>
AuthorDate: Tue Jul 14 14:23:43 2020 +0700

    JAMES-3305 Task manager deserialization error handling
---
 .../distributed/RabbitMQWorkQueue.java             |  19 ++--
 .../distributed/DistributedTaskManagerTest.java    | 122 ++++++++++++++++++++-
 .../distributed/RabbitMQWorkQueueTest.java         |   5 +-
 3 files changed, 133 insertions(+), 13 deletions(-)

diff --git 
a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
 
b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index f1daf1f..6f5e22e 100644
--- 
a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++ 
b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -21,6 +21,7 @@
 package org.apache.james.task.eventsourcing.distributed;
 
 import static com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN;
+import static org.apache.james.backends.rabbitmq.Constants.REQUEUE;
 
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
@@ -129,13 +130,15 @@ public class RabbitMQWorkQueue implements WorkQueue {
     }
 
     private Mono<Task.Result> executeTask(AcknowledgableDelivery delivery) {
-        TaskId taskId = 
TaskId.fromString(delivery.getProperties().getHeaders().get(TASK_ID).toString());
-        return Mono.fromCallable(() -> {
-            delivery.ack();
-            return new String(delivery.getBody(), StandardCharsets.UTF_8);
-        }).flatMap(json ->
-            deserialize(json, taskId)
-                .flatMap(task -> executeOnWorker(taskId, task)));
+        return Mono.fromCallable(() -> 
TaskId.fromString(delivery.getProperties().getHeaders().get(TASK_ID).toString()))
+            .flatMap(taskId -> deserialize(new String(delivery.getBody(), 
StandardCharsets.UTF_8), taskId)
+                .doOnNext(task -> delivery.ack())
+                .flatMap(task -> executeOnWorker(taskId, task)))
+            .onErrorResume(error -> {
+                LOGGER.error("Unable to process {} {}", TASK_ID, 
delivery.getProperties().getHeaders().get(TASK_ID), error);
+                delivery.nack(!REQUEUE);
+                return Mono.empty();
+            });
     }
 
     private Mono<Task> deserialize(String json, TaskId taskId) {
@@ -225,4 +228,4 @@ public class RabbitMQWorkQueue implements WorkQueue {
         
Optional.ofNullable(cancelRequestListenerHandle).ifPresent(Disposable::dispose);
         Optional.ofNullable(cancelRequestListener).ifPresent(Receiver::close);
     }
-}
+}
\ No newline at end of file
diff --git 
a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
 
b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
index db84283..37e4b7d 100644
--- 
a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
+++ 
b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
@@ -19,17 +19,27 @@
 
 package org.apache.james.task.eventsourcing.distributed;
 
+import static com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN;
+import static 
org.apache.james.backends.cassandra.Scenario.Builder.executeNormally;
+import static org.apache.james.backends.cassandra.Scenario.Builder.fail;
+import static 
org.apache.james.task.eventsourcing.distributed.RabbitMQWorkQueue.EXCHANGE_NAME;
+import static 
org.apache.james.task.eventsourcing.distributed.RabbitMQWorkQueue.ROUTING_KEY;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+import static org.awaitility.Duration.FIVE_SECONDS;
 
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.Scenario;
 import org.apache.james.backends.cassandra.components.CassandraModule;
 import org.apache.james.backends.cassandra.init.CassandraZonedDateTimeModule;
 import 
org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
@@ -56,6 +66,8 @@ import org.apache.james.server.task.json.dto.TaskDTOModule;
 import org.apache.james.server.task.json.dto.TestTaskDTOModules;
 import org.apache.james.task.CompletedTask;
 import org.apache.james.task.CountDownLatchExtension;
+import org.apache.james.task.FailedTask;
+import org.apache.james.task.FailsDeserializationTask;
 import org.apache.james.task.Hostname;
 import org.apache.james.task.MemoryReferenceTask;
 import org.apache.james.task.Task;
@@ -63,6 +75,7 @@ import org.apache.james.task.TaskExecutionDetails;
 import org.apache.james.task.TaskId;
 import org.apache.james.task.TaskManager;
 import org.apache.james.task.TaskManagerContract;
+import org.apache.james.task.TaskWithId;
 import org.apache.james.task.WorkQueue;
 import org.apache.james.task.eventsourcing.EventSourcingTaskManager;
 import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection;
@@ -78,9 +91,13 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
 import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
+import com.rabbitmq.client.AMQP;
 
+import reactor.core.publisher.Mono;
+import reactor.rabbitmq.OutboundMessage;
 import reactor.rabbitmq.Sender;
 
 class DistributedTaskManagerTest implements TaskManagerContract {
@@ -114,6 +131,9 @@ class DistributedTaskManagerTest implements 
TaskManagerContract {
     static final DTOConverter<TaskExecutionDetails.AdditionalInformation, 
AdditionalInformationDTO> TASK_ADDITIONAL_INFORMATION_DTO_CONVERTER = 
DTOConverter.of(ADDITIONAL_INFORMATION_MODULE);
     static final Hostname HOSTNAME = new Hostname("foo");
     static final Hostname HOSTNAME_2 = new Hostname("bar");
+    static final TaskId TASK_ID = 
TaskId.fromString("2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd");
+    static final Task TASK = new CompletedTask();
+    static final TaskWithId TASK_WITH_ID = new TaskWithId(TASK_ID, TASK);
 
     @RegisterExtension
     static final RabbitMQExtension rabbitMQExtension = 
RabbitMQExtension.singletonRabbitMQ();
@@ -132,6 +152,7 @@ class DistributedTaskManagerTest implements 
TaskManagerContract {
 
     ImmutableSet<TaskDTOModule<?, ?>> taskDTOModules =
         ImmutableSet.of(
+            TestTaskDTOModules.FAILS_DESERIALIZATION_TASK_MODULE,
             TestTaskDTOModules.COMPLETED_TASK_MODULE,
             TestTaskDTOModules.FAILED_TASK_MODULE,
             TestTaskDTOModules.THROWING_TASK_MODULE,
@@ -337,7 +358,7 @@ class DistributedTaskManagerTest implements 
TaskManagerContract {
     }
 
     @Test
-    void 
givenTwoTaskManagerIfTheFirstOneIsDownTheSecondOneShouldBeAbleToRunTheRemainingTasks(CountDownLatch
 countDownLatch) throws Exception {
+    void 
givenTwoTaskManagerIfTheFirstOneIsDownTheSecondOneShouldBeAbleToRunTheRemainingTasks(CountDownLatch
 countDownLatch) {
         try (EventSourcingTaskManager taskManager1 = taskManager();
              EventSourcingTaskManager taskManager2 = taskManager(HOSTNAME_2)) {
             ImmutableBiMap<EventSourcingTaskManager, Hostname> 
hostNameByTaskManager = ImmutableBiMap.of(taskManager1, HOSTNAME, taskManager2, 
HOSTNAME_2);
@@ -364,6 +385,105 @@ class DistributedTaskManagerTest implements 
TaskManagerContract {
         }
     }
 
+    @Test
+    void shouldNotCrashWhenBadMessage() {
+        TaskManager taskManager = taskManager(HOSTNAME);
+
+        taskManager.submit(new FailsDeserializationTask());
+
+        TaskId id = taskManager.submit(TASK);
+
+        awaitUntilTaskHasStatus(id, TaskManager.Status.COMPLETED, taskManager);
+    }
+
+    @Test
+    void shouldNotCrashWhenBadMessages() {
+        TaskManager taskManager = taskManager(HOSTNAME);
+
+        IntStream.range(0, 100).forEach(i -> taskManager.submit(new 
FailsDeserializationTask()));
+
+        TaskId id = taskManager.submit(TASK);
+
+        awaitUntilTaskHasStatus(id, TaskManager.Status.COMPLETED, taskManager);
+    }
+
+    @Test
+    void shouldNotCrashWhenInvalidHeader() throws Exception {
+        TaskManager taskManager = taskManager(HOSTNAME);
+
+        AMQP.BasicProperties badProperties = new AMQP.BasicProperties.Builder()
+            .deliveryMode(PERSISTENT_TEXT_PLAIN.getDeliveryMode())
+            .priority(PERSISTENT_TEXT_PLAIN.getPriority())
+            .contentType(PERSISTENT_TEXT_PLAIN.getContentType())
+            .headers(ImmutableMap.of("abc", TASK_WITH_ID.getId().asString()))
+            .build();
+
+        rabbitMQExtension.getSender()
+            .send(Mono.just(new OutboundMessage(EXCHANGE_NAME,
+                ROUTING_KEY, badProperties, 
taskSerializer.serialize(TASK_WITH_ID.getTask()).getBytes(StandardCharsets.UTF_8))))
+            .block();
+
+        TaskId taskId = taskManager.submit(TASK);
+
+        await().atMost(FIVE_SECONDS).until(() -> 
taskManager.list(TaskManager.Status.COMPLETED).size() == 1);
+
+        assertThat(taskManager.getExecutionDetails(taskId).getStatus())
+            .isEqualTo(TaskManager.Status.COMPLETED);
+    }
+
+    @Test
+    void shouldNotCrashWhenInvalidTaskId() throws Exception {
+        TaskManager taskManager = taskManager(HOSTNAME);
+
+        AMQP.BasicProperties badProperties = new AMQP.BasicProperties.Builder()
+            .deliveryMode(PERSISTENT_TEXT_PLAIN.getDeliveryMode())
+            .priority(PERSISTENT_TEXT_PLAIN.getPriority())
+            .contentType(PERSISTENT_TEXT_PLAIN.getContentType())
+            .headers(ImmutableMap.of("taskId", "BAD_ID"))
+            .build();
+
+        rabbitMQExtension.getSender()
+            .send(Mono.just(new OutboundMessage(EXCHANGE_NAME,
+                ROUTING_KEY, badProperties, 
taskSerializer.serialize(TASK_WITH_ID.getTask()).getBytes(StandardCharsets.UTF_8))))
+            .block();
+
+        TaskId taskId = taskManager.submit(TASK);
+
+        await().atMost(FIVE_SECONDS).until(() -> 
taskManager.list(TaskManager.Status.COMPLETED).size() == 1);
+
+        assertThat(taskManager.getExecutionDetails(taskId).getStatus())
+            .isEqualTo(TaskManager.Status.COMPLETED);
+    }
+
+    @Test
+    void shouldNotCrashWhenErrorHandlingFails(CassandraCluster cassandra) 
throws Exception {
+        TaskManager taskManager = taskManager(HOSTNAME);
+
+        cassandra.getConf().printStatements();
+        cassandra.getConf().registerScenario(Scenario.combine(
+            executeNormally()
+                .times(2) // submit + inProgress
+                .whenQueryStartsWith("INSERT INTO eventStore"),
+            executeNormally()
+                .times(2) // submit + inProgress
+                .whenQueryStartsWith("INSERT INTO 
taskExecutionDetailsProjection"),
+            fail()
+                .forever()
+                .whenQueryStartsWith("INSERT INTO eventStore"),
+            fail()
+                .forever()
+                .whenQueryStartsWith("INSERT INTO 
taskExecutionDetailsProjection")));
+        taskManager.submit(new FailedTask());
+
+        Thread.sleep(1000);
+
+        cassandra.getConf().registerScenario(Scenario.NOTHING);
+
+        TaskId id2 = taskManager.submit(new CompletedTask());
+
+        awaitUntilTaskHasStatus(id2, TaskManager.Status.COMPLETED, 
taskManager);
+    }
+
     private Hostname getOtherNode(ImmutableBiMap<EventSourcingTaskManager, 
Hostname> hostNameByTaskManager, Hostname node) {
         return hostNameByTaskManager
             .values()
diff --git 
a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
 
b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
index 4eb0323..5832863 100644
--- 
a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
+++ 
b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java
@@ -25,7 +25,6 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.awaitility.Awaitility.await;
 import static org.awaitility.Duration.FIVE_HUNDRED_MILLISECONDS;
 import static org.awaitility.Duration.TWO_SECONDS;
-import static org.mockito.Mockito.spy;
 
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.IntStream;
@@ -67,7 +66,7 @@ class RabbitMQWorkQueueTest {
 
     @BeforeEach
     void setUp() {
-        worker = spy(new ImmediateWorker());
+        worker = new ImmediateWorker();
         serializer = 
JsonTaskSerializer.of(TestTaskDTOModules.COMPLETED_TASK_MODULE, 
TestTaskDTOModules.MEMORY_REFERENCE_TASK_MODULE.apply(new 
MemoryReferenceTaskStore()));
         testee = new RabbitMQWorkQueue(worker, rabbitMQExtension.getSender(), 
rabbitMQExtension.getReceiverProvider(), serializer);
         testee.start();
@@ -160,7 +159,5 @@ class RabbitMQWorkQueueTest {
 
         assertThatThrownBy(() -> 
await().atMost(FIVE_HUNDRED_MILLISECONDS).untilAtomic(counter, 
CoreMatchers.equalTo(3L))).isInstanceOf(ConditionTimeoutException.class);
         assertThatCode(() -> await().atMost(TWO_SECONDS).untilAtomic(counter, 
CoreMatchers.equalTo(3L))).doesNotThrowAnyException();
-
     }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to