This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit b3ecc4645581be9625ea7f4912886f9075459074 Author: Benoit Tellier <btell...@linagora.com> AuthorDate: Wed Jul 1 17:08:22 2020 +0700 JAMES-3290 Prevent an infinite loop when RabbitMQMailQueue message references a blob that no longer exist. --- .../org/apache/james/queue/rabbitmq/Dequeuer.java | 6 +++ .../queue/rabbitmq/RabbitMQMailQueueTest.java | 49 ++++++++++++++++++++++ 2 files changed, 55 insertions(+) diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java index 27fe748..09a3df6 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.util.function.Consumer; import org.apache.james.backends.rabbitmq.ReceiverProvider; +import org.apache.james.blob.api.ObjectNotFoundException; import org.apache.james.metrics.api.Metric; import org.apache.james.metrics.api.MetricFactory; import org.apache.james.queue.api.MailQueue; @@ -142,6 +143,11 @@ class Dequeuer implements Closeable { private Mono<MailWithEnqueueId> loadMail(AcknowledgableDelivery response) { return toMailReference(response) .flatMap(reference -> mailLoader.load(reference) + .onErrorResume(ObjectNotFoundException.class, e -> { + LOGGER.error("Fail to load mail {} with enqueueId {} as underlying blobs do not exist. Discarding this message to prevent an infinite loop.", reference.getName(), reference.getEnqueueId(), e); + response.nack(!REQUEUE); + return Mono.empty(); + }) .onErrorResume(e -> { LOGGER.error("Fail to load mail {} with enqueueId {}", reference.getName(), reference.getEnqueueId(), e); response.nack(REQUEUE); diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java index 6bf316b..ee32d8a 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java @@ -20,7 +20,9 @@ package org.apache.james.queue.rabbitmq; import static java.time.temporal.ChronoUnit.HOURS; +import static org.apache.james.backends.cassandra.Scenario.Builder.executeNormally; import static org.apache.james.backends.cassandra.Scenario.Builder.fail; +import static org.apache.james.backends.cassandra.Scenario.Builder.returnEmpty; import static org.apache.james.queue.api.Mails.defaultMail; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; @@ -31,6 +33,7 @@ import static org.mockito.Mockito.verify; import java.time.Duration; import java.time.Instant; import java.util.List; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.function.Function; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -72,6 +75,7 @@ import com.github.fge.lambdas.Throwing; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; class RabbitMQMailQueueTest { private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory(); @@ -329,6 +333,51 @@ class RabbitMQMailQueueTest { .extracting(item -> item.getMail().getName()) .containsOnly(name1, name2, name3); } + + @Test + void dequeueShouldNotRetryWhenBlobIsMissing(CassandraCluster cassandra) throws Exception { + String name1 = "myMail1"; + String name2 = "myMail2"; + String name3 = "myMail3"; + + getMailQueue().enQueue(defaultMail() + .name(name1) + .build()); + + getMailQueue().enQueue(defaultMail() + .name(name2) + .build()); + + getMailQueue().enQueue(defaultMail() + .name(name3) + .build()); + + cassandra.getConf().registerScenario(returnEmpty() + .forever() + .whenQueryStartsWith("SELECT * FROM blobs WHERE id=:id;")); + + ConcurrentLinkedDeque<String> dequeuedNames = new ConcurrentLinkedDeque<>(); + Flux.from(getMailQueue().deQueue()) + .take(3) + .doOnNext(item -> dequeuedNames.add(item.getMail().getName())) + .doOnNext(Throwing.consumer(item -> item.done(true))) + .subscribeOn(Schedulers.elastic()) + .subscribe(); + + // One second should be enough to attempt dequeues while we fail to load blobs + Thread.sleep(1000); + + // Restore normal behaviour + cassandra.getConf().registerScenario(executeNormally() + .forever() + .whenQueryStartsWith("SELECT * FROM blobs WHERE id=:id;")); + + // Let one second to check if the queue is empty + Thread.sleep(1000); + + // We expect content missing blob references to be purged from the queue + assertThat(dequeuedNames).isEmpty(); + } } @Nested --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org