This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit ea6e8843783d7f43b4badc84860b6c3deadbc62e Author: Benoit Tellier <btell...@linagora.com> AuthorDate: Wed Jul 1 15:45:05 2020 +0700 JAMES-3290 Mails failing to load should not be lost by dequeue --- .../org/apache/james/queue/rabbitmq/Dequeuer.java | 12 ++++++-- .../queue/rabbitmq/RabbitMQMailQueueTest.java | 33 ++++++++++++++++++++++ 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java index b13b613..27fe748 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java @@ -33,6 +33,8 @@ import org.apache.james.queue.api.MailQueueFactory; import org.apache.james.queue.rabbitmq.view.api.DeleteCondition; import org.apache.james.queue.rabbitmq.view.api.MailQueueView; import org.apache.mailet.Mail; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.github.fge.lambdas.Throwing; import com.github.fge.lambdas.consumers.ThrowingConsumer; @@ -46,6 +48,7 @@ import reactor.rabbitmq.ConsumeOptions; import reactor.rabbitmq.Receiver; class Dequeuer implements Closeable { + private static final Logger LOGGER = LoggerFactory.getLogger(Dequeuer.class); private static final boolean REQUEUE = true; private static class RabbitMQMailQueueItem implements MailQueue.MailQueueItem { @@ -136,9 +139,14 @@ class Dequeuer implements Closeable { }; } - private Mono<MailWithEnqueueId> loadMail(Delivery response) { + private Mono<MailWithEnqueueId> loadMail(AcknowledgableDelivery response) { return toMailReference(response) - .flatMap(mailLoader::load); + .flatMap(reference -> mailLoader.load(reference) + .onErrorResume(e -> { + LOGGER.error("Fail to load mail {} with enqueueId {}", reference.getName(), reference.getEnqueueId(), e); + response.nack(REQUEUE); + return Mono.empty(); + })); } private Mono<MailReferenceDTO> toMailReference(Delivery getResponse) { diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java index d34a953..6bf316b 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java @@ -20,6 +20,7 @@ package org.apache.james.queue.rabbitmq; import static java.time.temporal.ChronoUnit.HOURS; +import static org.apache.james.backends.cassandra.Scenario.Builder.fail; import static org.apache.james.queue.api.Mails.defaultMail; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; @@ -296,6 +297,38 @@ class RabbitMQMailQueueTest { })) .blockLast(); } + + @Test + void dequeueShouldRetryLoadingErrors(CassandraCluster cassandra) throws Exception { + String name1 = "myMail1"; + String name2 = "myMail2"; + String name3 = "myMail3"; + + getMailQueue().enQueue(defaultMail() + .name(name1) + .build()); + + getMailQueue().enQueue(defaultMail() + .name(name2) + .build()); + + getMailQueue().enQueue(defaultMail() + .name(name3) + .build()); + + cassandra.getConf().registerScenario(fail() + .times(1) + .whenQueryStartsWith("SELECT * FROM blobs WHERE id=:id;")); + + List<MailQueue.MailQueueItem> items = Flux.from(getMailQueue().deQueue()) + .take(3) + .collectList() + .block(Duration.ofSeconds(10)); + + assertThat(items) + .extracting(item -> item.getMail().getName()) + .containsOnly(name1, name2, name3); + } } @Nested --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org