This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit aa6ac0bf7a5a2403b9ef28a8db50639129ae0007 Author: Benoit Tellier <btell...@linagora.com> AuthorDate: Thu Jul 2 16:55:55 2020 +0700 JAMES-3265 RabbitMQ MailQueue should deadletter dropped messages This prevents loosing dropped messages --- .../org/apache/james/queue/rabbitmq/Enqueuer.java | 2 ++ .../apache/james/queue/rabbitmq/MailQueueName.java | 10 ++++++++ .../queue/rabbitmq/RabbitMQMailQueueFactory.java | 13 ++++++++++ .../queue/rabbitmq/RabbitMQMailQueueTest.java | 30 +++++++++++++++++++++- 4 files changed, 54 insertions(+), 1 deletion(-) diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java index 481e380..c134798 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java @@ -38,6 +38,7 @@ import org.apache.mailet.Mail; import com.fasterxml.jackson.core.JsonProcessingException; import com.github.fge.lambdas.Throwing; +import com.google.common.collect.ImmutableMap; import com.rabbitmq.client.AMQP; import reactor.core.publisher.Mono; @@ -88,6 +89,7 @@ class Enqueuer { .deliveryMode(PERSISTENT_TEXT_PLAIN.getDeliveryMode()) .priority(PERSISTENT_TEXT_PLAIN.getPriority()) .contentType(PERSISTENT_TEXT_PLAIN.getContentType()) + .headers(ImmutableMap.of("x-dead-letter-routing-key", EMPTY_ROUTING_KEY)) .build(); OutboundMessage data = new OutboundMessage( diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailQueueName.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailQueueName.java index dfc352d..f70151e 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailQueueName.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailQueueName.java @@ -112,6 +112,8 @@ public final class MailQueueName { private static final String PREFIX = "JamesMailQueue"; private static final String EXCHANGE_PREFIX = PREFIX + "-exchange-"; + private static final String DEAD_LETTER_EXCHANGE_PREFIX = PREFIX + "-dead-letter-exchange-"; + private static final String DEAD_LETTER_QUEUE_PREFIX = PREFIX + "-dead-letter-queue-"; @VisibleForTesting static final String WORKQUEUE_PREFIX = PREFIX + "-workqueue-"; public static MailQueueName fromString(String name) { @@ -134,6 +136,14 @@ public final class MailQueueName { return name; } + String toDeadLetterExchangeName() { + return DEAD_LETTER_EXCHANGE_PREFIX + name; + } + + String toDeadLetterQueueName() { + return DEAD_LETTER_QUEUE_PREFIX + name; + } + ExchangeName toRabbitExchangeName() { return new ExchangeName(name); } diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java index e9b3c32..4dd0181 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java @@ -46,6 +46,7 @@ import org.apache.james.queue.rabbitmq.view.RabbitMQMailQueueConfiguration; import org.apache.james.queue.rabbitmq.view.api.MailQueueView; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import reactor.core.publisher.Flux; @@ -156,14 +157,26 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu sender.declareExchange(ExchangeSpecification.exchange(exchangeName) .durable(true) .type("direct")), + sender.declareExchange(ExchangeSpecification.exchange(mailQueueName.toDeadLetterExchangeName()) + .durable(true) + .type("direct")), sender.declareQueue(QueueSpecification.queue(mailQueueName.toWorkQueueName().asString()) .durable(DURABLE) .exclusive(!EXCLUSIVE) .autoDelete(!AUTO_DELETE) + .arguments(ImmutableMap.of("x-dead-letter-exchange", mailQueueName.toDeadLetterExchangeName()))), + sender.declareQueue(QueueSpecification.queue(mailQueueName.toDeadLetterQueueName()) + .durable(DURABLE) + .exclusive(!EXCLUSIVE) + .autoDelete(!AUTO_DELETE) .arguments(NO_ARGUMENTS)), sender.bind(BindingSpecification.binding() .exchange(mailQueueName.toRabbitExchangeName().asString()) .queue(mailQueueName.toWorkQueueName().asString()) + .routingKey(EMPTY_ROUTING_KEY)), + sender.bind(BindingSpecification.binding() + .exchange(mailQueueName.toDeadLetterExchangeName()) + .queue(mailQueueName.toDeadLetterQueueName()) .routingKey(EMPTY_ROUTING_KEY))) .then() .block(); diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java index 0289266..bf2d209 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java @@ -35,6 +35,7 @@ import java.time.Duration; import java.time.Instant; import java.util.List; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -327,7 +328,7 @@ class RabbitMQMailQueueTest { .times(1) .whenQueryStartsWith("SELECT * FROM blobs WHERE id=:id;")); - List<MailQueue.MailQueueItem> items = Flux.from(getMailQueue().deQueue()) + List<MailQueue.MailQueueItem> items = Flux.from(getMailQueue().deQueue()) .take(3) .collectList() .block(Duration.ofSeconds(10)); @@ -457,6 +458,33 @@ class RabbitMQMailQueueTest { .untilAsserted(() -> assertThat(dequeuedMailNames) .containsExactly(name1, name2, name3)); } + + @Test + void rejectedMessagesShouldBeDeadLettered() { + String emptyRoutingKey = ""; + rabbitMQExtension.getSender() + .send(Mono.just(new OutboundMessage("JamesMailQueue-exchange-spool", + emptyRoutingKey, + "BAD_PAYLOAD!".getBytes(StandardCharsets.UTF_8)))) + .block(); + + AtomicInteger deadLetteredCount = new AtomicInteger(); + rabbitMQExtension.getRabbitChannelPool() + .createReceiver() + .consumeAutoAck("JamesMailQueue-dead-letter-queue-spool") + .doOnNext(next -> deadLetteredCount.incrementAndGet()) + .subscribeOn(Schedulers.elastic()) + .subscribe(); + + Flux.from(getMailQueue().deQueue()) + .doOnNext(Throwing.consumer(item -> item.done(true))) + .subscribeOn(Schedulers.elastic()) + .subscribe(); + + + Awaitility.await().atMost(org.awaitility.Duration.TEN_SECONDS) + .untilAsserted(() -> assertThat(deadLetteredCount.get()).isEqualTo(1)); + } } @Nested --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org